Thanks to Sam Shuster, Software Engineer at Edmunds.com, for the guest post below about his company’s use case for Spark Streaming, SparkOnHBase, and Morphlines.
Every year, the Super Bowl brings parties, food and hopefully a great game to appease everyone’s football appetites until the fall. With any event that brings in around 114 million viewers with larger numbers each year, Americans have also grown accustomed to commercials with production budgets on par with television shows and with entertainment value that tries to rival even the game itself.
Some of the big spenders every year are car manufacturers. Edmunds.com is a car shopping website where people from all across the nation come to read reviews, compare prices, and in general get help in all matters car related. Because of Edmunds’ place as a destination for automobile news and facts, Super Bowl car commercials do indeed mean traffic spikes to make and model specific pages.
For the last few years, business analysts at Edmunds have used this unique opportunity of unfettered advertising cause and effect as a way to create awareness by sending out updates during the game to interested parties concerning how the commercials have affected site traffic for particular make and model pages. Unfortunately, in the past, these updates have been restricted to hourly granularities with an additional hour delay. Furthermore, as this data was not available in an easy-to-use dashboard, manual processing was needed to visualize the data.
As our team began to transition from MapReduce to Apache Spark, we saw this use case as a perfect opportunity to explore a solution via Spark Streaming. The goal was to build a near real-time dashboard that would provide both unique visitor and page view counts per make and make/model that could be engineered in a couple of weeks.
Here is our prototype architecture (by no means optimal) that takes Apache Web Server loglines describing user event information, aggregates loglines by visitor id and make/model page information to get unique visitor and page view counts and finally outputs this information to be visualized by a near real-time dashboard.
As we were already using Apache Flume for ingesting our logs, using a Flume Spark Streaming sink was an obvious choice. In addition to being more reliable, the polling sink does not require the Flume agent to be restarted every time the Spark Streaming job is restarted.
Unfortunately, due to the limitations of our media-tier Flume agent configuration, which currently spools the log files on five-minute intervals, we are currently limited to a streaming micro-batch size of five minutes as well. Eventually, the goal will be to change the production media flume agents to tail the logs so that data is flowing at a constant rate. Every other stage of our architecture could easily handle the load at 30 seconds or less so there is no reason other than our team not being in control over this Flume agent’s configuration that we have not been able to break 5 minutes.
Why Spark Streaming?
While we have had success using a Flume Solr sink combined with custom Morphlines to go directly from Flume to Apache Solr for log lines with out aggregates, we needed something that was able to perform complicated aggregations quickly, which is why Spark Streaming was necessary.
Why HBase and Lily?
At this point, as the end goal for this application was Banana (the dashboard tool that reads directly from Solr) you might wonder why we decided to include Apache HBase and Lily HBase Indexer as added complications to an already fairly lengthy pipeline. There were a couple of reasons:
- The existence of the Cloudera Labs project, SparkOnHBase. (This blog post explains more about the project and how to incorporate it into your Spark jobs.) This library provides an easy-to-use interface for connecting Spark batch and streaming jobs to HBase. Writing directly to Solr would have required an entirely new library, with functionality very similar to what already exists in the SparkOnHBase project.
- Our existing processing ecosystem features HBase as an important data sink/source and ideally we would want streaming data to be available in HBase for other processes.
- Having HBase as an intermediate data store means that we have more flexibility if we ever decide to change our front-end dashboard.
- Finally, in the event Solr was to crash, HBase has all data replicated and Lily could be used to repopulate Solr.
Why Solr and Banana?
A main reason for wishing to include Solr is that it exposes the data in a fashion that makes it easily accessible to others through a rest interface and queries.
As for dashboards, we did briefly consider other tools like Graphite but found that for this initial prototype that the flexibility, ease of use, and customizability of Banana was perfect for our use case and lack of expertise in the area. Plus, Banana is free.
We want to calculate two different metrics:
- Page View Counts
- Unique Visitor Counts
We want to compute the above metrics for:
- Every make
- Every model
Finally, we want to have two time windows:
- Cumulative count that refreshes at midnight eastern time
- Count that is on the scale of the micro batch size of the streaming process
While eventually Edmunds will want to aggregate data using windowing functions to obtain counts for other time periods (hourly for example), for this prototype we restricted ourselves to only aggregations for every micro batch and a cumulative count through using the
updateStateByKey function. While this cumulative statistic can be trivially computed as the sum of the microbatch values for page views, unique visitors require that this cumulative count be computed separately.
Saving every visitor to determine uniqueness for a 24-hour time period would be resource intensive, so we decided to use the Twitter algebird implementation of an approximate streaming algorithm called HyperLogLog. The state stored by the DStream call is thus the HLL object itself, which represents an approximate set of the visitors seen so far as well as the cardinality of that set. For those of who want to know how close the approximation came to actual data, for a 24-bit HLL we had an average % error of 0.016% and a standard deviation of 0.106%—so it performed very well while taking up a fixed, small memory cost for each make and make model count.
Here is an example flow of the aggregation of the unique visitors metrics, which ends with outputting two DStreams for the two different time windows:
Working with HBase
While performing scans on HBase is easy to do using the Spark API, doing Puts, Gets, and other useful operations on HBase is much trickier. The SparkOnHBase library provides a simple API that abstracts much of the lower level operations that are required to achieve those operations. This specific streaming job uses the
streamBulkPut method. For every count, we put a row key that is comprised of the make, model, data point, aggregation type and timestamp. The reason for including the timestamp in the row key itself is so that every value is written to its own row without using versioning. Finally, we also put these values under different qualifiers for each of the attributes so that the Lily Indexer Morphline can easy transform those values into the Solr fields without having to parse the row key.
Lily Indexer Morphline and Solr Schema
Here is a snippet of the Morphline used by Lily Indexer to transform the HBase data into the Solr schema. The format of the Solr schema was primary chosen based on the limitations of Banana.
mappings : [
inputColumn : "data:count"
outputField : "count"
type : long
source : value
inputColumn : "data:timestamp"
outputField : "timestamp"
type : string
source : value
Morphlines is an open source framework (inside Kite SDK) with the sole intent to make ETL processes as painless to create, and as highly configurable, as possible. Basically, a Morphline file provides a sequence of commands that you wish to apply to your data and that are powerful enough to even fully replace such things as log processing applications (which we have done with a different Flume -> Morphlines -> Solr process).
Morphlines in this pipeline are used by Lily Indexer as the logic for how to transform HBase values into Solr values. For example, the first mapping above says that the input column is in the column family called
data and the qualifier
count and it is the
value. This field is to be put into the Solr field count.
<field name="timestamp" type="date" indexed="true" stored="true" />
<field name="count" type="long" indexed="false" stored="true"/>
<field name="valueType" type="string" indexed="true" stored="true"/>
<field name="aggregationType" type="string" indexed="true" stored="true"/>
<field name="make" type="string" indexed="true" stored="true" default="EMPTY"/>
<field name="makeModel" type="string" indexed="true" stored="true" default="EMPTY"/>
<field name="id" type="string" indexed="true" stored="true"/>
Banana then can make time range queries to the Solr server based on the above schema to create time-series plots. The below plot shows page views per minute between at 14:00 to 21:00 PST on Jan. 25, 2015, for each make.
Page views per a minute per make on a normal Sunday. This is the Banana panel (histogram) that supports grouping by a separate Solr field (in this case, make).
This is the query that generates the above plot (note timestamps are actually in GMT):
q=*%3A*&df=id&wt=json&rows=100000&fq=timestamp:[2015-02-01T22:00:00.000Z%20TO%202015-02-02T05:00:00.000Z]&fl=timestamp count&group=true&group.field=make&group.limit=100000&fq=aggregationType:PVCURR -make:EMPTY
You then can choose how often you wish Banana to query Solr for updates and voila, you have your streaming near real-time dashboard!
So you might be skeptical that we would actually see detectable differences to make and make model specific pages during the Super Bowl. Would people really visit Edmunds.com while the game is happening? Well, compare the previous snapshot to the snapshot below, which is for 16:00 to 21:00 on Super Bowl Sunday 2015 (on Feb. 1).
Super Bowl Sunday page views per a minute. Note the peaks which blow the normal noise maximum of 450 out of the water!
Super Bowl XLIX took place from 3:30pm PST to 7:30pm PST and as you can see as compared to normal site traffic there are huge spikes up to around 5500 page views for Lexus near half time. This is as compared to around 150 page views on a normal Sunday evening. No statistical tests are needed to determine if those are significant increases!
Let’s look at a specific instance. At around 5:55pm (unfortunately we do not have exact times of when the commercials aired), there was a Kia Sorento commercial that featured Pierce Brosnan in a James Bond spoof. Kia is a much less popular make on Edmunds.com in general, so we see a much smaller increase in unique visitors to Kia pages – going up to around 1,049 unique visitors at its peak at 6:10pm. This commercial, however, meant that Kia Sorento finished as the third-most viewed model for the day.
Kia unique visitors per 5 minutes. Notice the peak at 6:10 to 1049 unique visitors.
Kia Sorento unique visitors per 5 minutes. Note the increase at 6:10 to 923 unique visitors.
Kia Sorento cumulative unique visitors. Note the huge increase at 6:10 which helped Kia Sorento finish as third-most visited model at Edmunds.com on Super Bowl Sunday.
As I have hopefully have demonstrated, the Spark Streaming prototype was a success and satisfied all of our requirements in being able to present near real time updates of unique visitors and page views to make and make model pages on Edmunds.com. The fact that the system was able to be put together in a tight timeframe and was reliable enough to be used on a live Super Bowl campaign is a testament to the conciseness and relative ease of the new way of thinking that is Spark. So, what are you waiting for?