How-to: Do Near-Real Time Sessionization with Spark Streaming and Apache Hadoop

Categories: How-to Spark Use Case

This Spark Streaming use case is a great example of how near-real-time processing can be brought to Hadoop.

Spark Streaming is one of the most interesting components within the Apache Spark stack. With Spark Streaming, you can create data pipelines that process streamed data using the same API that you use for processing batch-loaded data. Furthermore, Spark Steaming’s “micro-batching” approach provides decent resiliency should a job fail for some reason.

In this post, I will demonstrate and walk you through some common and advanced Spark Streaming functionality via the use case of doing near-real time sessionization of Website events, then load stats about that activity into Apache HBase, and then populate graphs in your preferred BI tool for analysis. (Sessionization refers to the capture of all clickstream activity within the timeframe of a single visitor’s Website session.) You can find the code for this demo here.

A system like this one can be super-useful for understanding visitor behavior (whether human or machine). With some additional work, it can also be designed to contain windowing patterns for detecting possible fraud in an asynchronous manner.

Spark Streaming Code

The main class to look at in our example is:

com.cloudera.sa.example.sparkstreaming.sessionization.SessionizeData

Let’s look at this code in sections (ignoring lines 1-59, which contains imports and other uninteresting stuff).

Lines 60 to 112: Setting up Spark Streaming

These lines are our pretty basic start for Spark Streaming with an option to receive data from HDFS or a socket. I’ve added some verbose comments to help you understand the code if you are new to Spark Streaming. (I’m not going to go into great detail here because we’re still in the boilerplate-code zone.)

Lines 114 to 124: String Parsing

Here’s where the Streaming with Spark begins. Look at the following four lines:

The first command above is doing a map function on the “lines” DStream object and parsing the original events to separate out the IP address, timestamp, and event body. For those new to Spark Streaming, a DStream holds a batch of records to be processed. These records are populated by the receiver object, which was defined previously, and this map function produces another DStream within this micro-batch to store the transformed records for additional processing.

There are a couple things to note when looking at a Spark Streaming diagram like the one above:

  • Each micro-batch is fired at the number of seconds defined when constructing your streamingContext
  • The Receiver is always populated the future RDDs for the next micro-batch
  • Older RDDs of past micro batch will be cleaned up and discarded
Lines 126 to 135: Making Sessions

Now that we have IP address and times broken out from the web log, it ‘s time to build sessions. The following code does the session building by first clumping events within the micro-batch, and then reducing those clumps with sessions in the stateful DStream.  

Here’s an example of how the records will be reduced within the micro-batch:  

With the session ranges joined within the micro-batch, we can use the super-cool updateStateByKey functionality, which will do a join/reduce-like operation with a DStream from the micro-batch before the active one. The diagram below illustrates how this process looks in terms of DStreams over time.

Now let’s dig into the updateStatbyOfSessions function, which is defined at the bottom of the file. This code (note the verbose comments) contains a lot of the magic that makes sessionization happen in a micro-batch continuous mode.  

There’s a lot going on in this code, and in many ways, it’s the most complex part of the whole job. To summarize, it tracks active sessions so you know if you are continuing an existing session or starting a new one.

Line 126 to 207: Counting and HBase

This section is where most of the counting happens. There is a lot of repetition here, so let’s walk through just one count example and then the steps that will allow us to put the generated counts in the same record for storage in HBase.  

In short, the code above is filtering all but the active sessions, counting them, and putting that final count record into a single entity HashMap. It uses the HashMap as a container, so we can call the following reduce function after all the counts are done to put them all into a single record. (I’m sure there are better ways to do that, but this approach works just fine.)

Next, the following code takes all those HashMaps and puts all their values in one HashMap.

Interacting with HBase through Spark Streaming is super simple with HBaseContext. All you have to do is supply the DStream with the HashMap and a function to convert it to a put object. 

Now with this information in HBase, can wrap it up with an Apache Hive table, and then execute a query through your favorite BI tool to get graphs like the following that will refresh on every micro-batch.

Lines 209 to 215: Writing to HDFS

The final task is to join the active session information with the event data and then persist the events to HDFS with the starting time of the session.  

Conclusion

I hope you come away from this example feeling like a lot of work was done with just a little bit of code, because it was. Imagine about what else you can do with this pattern and the ability to interact with HBase and HDFS so easily within Spark Streaming.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Apache Spark, and a co-author of the O’Reilly book, Hadoop Applications Architecture.
 
 

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

12 responses on “How-to: Do Near-Real Time Sessionization with Spark Streaming and Apache Hadoop

  1. Otis Gospodnetic

    Thanks for a good overview and the code, Ted.
    Can you quantify the *Near* Real-time a bit?
    Imagine you are getting 100K events per second here.
    Would “Near” be 1 sec or 5 seconds or closer to a minute? How much RAM & CPU might one need for this?

    I know there is a lot of “it depends”, but if you can share some scenarios or factors or ranges of numbers, that would be very helpful, too.

    Thanks.

  2. Ted Malaska

    Hey Otis,

    I was testing it with around 10k per batch and the batch would finish with in a couple of seconds.

    This was more just example then full out scale. I only tested it on 3 nodes with only 10GB each. So really not the HW you want if you were going to do this for real.

    But to answer your question about 100k a second? Normally Spark Streaming is talked about in a couple of seconds. So I would think 1-3, how remember sessionization is a very partitionable problem. So I don’t see to much trouble in scaling this out pretty large. Now I also recommending some tuning to the code I write, again this was more for teaching not high load.

  3. Buntu

    Thanks for this tutorial.. really helpful.

    I couldn’t find the a sample input log that are being parsed.. can someone point to the sample.

  4. Mohit

    Hi Ted,
    Thanks for the post. I have a question around using spark streaming with kafka using java api. Firstly, I can’t find any good tutorial or example that shows how to use the java API for spark streaming .

    Secondly, In my use case I need to do some real time processing, like update counts of vaious parameters and group by some. Also, I want to store that data into hdfs so that it can be queried later(either by Hive/pig or MR jobs). And I want to store it in a way so that my messages are partitioned in a folder structure like /year/month/day/hour/messages.txt.

    Can my use case be satisfied using the approach above? Like first reading the data from spark streams and update a DB and then store it in HDFS? Also, our streaming intervals would be preferred to be small where as I would want an hourly or half hourly file to be dumped into HDFS. Could you please let me know through an example how you would do something like that?

  5. Ted M

    Hey Mohit,

    Java exmaples. Yes they are hard to come by. I have one in my SparkOnHBase example https://github.com/cloudera-labs/SparkOnHBase

    As for setting this up with Hive or Impala. You would write to HDFS with parquet. I would set the Spark Streaming iteration time large enough that you had enough to fill 256MB of data to HDFS. Then you write that file to HDFS and put it into the hive table. If you use Kite it will take case of the partitioning for you.

    Also I would set up a time. once a day or more to do a compaction. To get less files and better compression.

  6. JanX

    Hi Ted

    Thans a lot for this insightful post. If I understood correctly, transformations may be recomputed (e.g., when tasks or job stage need to be re-executed).
    For this very reason, usage of Accumulators is discouraged in transformations ..
    Also, it is assumed that the system clocked is called every 10 secondes in updateStatbyOfSessions, and this might not be a valid assumption, for similar reasons. …
    Or maybe these edgy situations happen so rarely that we don’t care at all …

  7. David

    Hi Ted

    Is there any way the state’s life/resource can be managed? Or it is always growing?

    For example in your sessionization example, the state will grow forever right? Is there any way to manage that so you can purge/archive 3 month only aggregation data or something?

    Cheers
    David

  8. Mayur Patel

    Hey Ted,

    I am working on use case where we have to fetch a data from 3 different Table. We are using Spark Streaming with Hbase. Reading a data from 3 different table and dump output to Hbase table.

    Do you have any sample code like this or any other reference. ? If i am want to use Spark Hbase context in this case, how I should I implement this method.? Couldn’t not find any good resource or tutorial online.

    Respond as earliest.

    Thanks..!

  9. Noam Ben-Ami

    Thank you for writing this article. There is one rather dangerous piece of code here that you should reconsider: The use of System.currentTimeMillis. In the case of replays of events from queues, which happens in the real world, the use of the current time may cause sessions to expire immediately. Consider instead using the transform function that provides an RDD and a Time. This will allow for safe re-execution of old streams.

  10. alchemist

    Is this code supported on the latest version of Spark 1.5? Somehow cannot find any maven repository to retrieve this new API.

  11. alchemist

    Question is what maven repository to include to get JavaHBaseContext? The example has code to integrate but looks like it is an year old and JIRA has been checked in. What maven resources to add to get the HBaseContext mentioned in the document.