nugg.ad operates Europe’s largest targeting platform. The company’s core business is to derive targeting recommendations from clicks and surveys. We measure these, store them in log files and later make sense of them all. In 2007 up until mid 2009 we used a classical data warehouse solution. As data volumes increased and performance suffered, we recognized that a new approach was needed. This post tells the story of how we arrived at using a Hadoop-based solution – and how we took jobs that required five days to process down to one hour.
Data Processing Platform Requirements
The nugg.ad service is split into two parts. The online targeting platform accessed by HTTP provides real-time targeting recommendations in response to a users clicking behavior. The off-line data processing platform performs the analytics to make this possible.
Currently our online platform creates on a daily basis just over a 100 GB of log data per day. The majority of data is for website clicks, ad clicks and targeting predictions. These are split into different files for each category and rotated on an hourly basis. The data which needs to be logged is sent in UDP packets to a series of log nodes implemented in Erlang.
The logging of user interactions with our online platform creates considerable amounts of data. We need to use all of this data in order to:
• Monitor the performance of ad campaigns
• Track the precision of targeting predictions
• Create reports used by our account managers and statisticians for decision making
• Summarize the data for our customers to help sell ad space
• Run ad hoc reports over historical data
• Extract training data for our prediction algorithms
• Build machine learning models
When We Were Young and Energetic
The initial solution for the data processing platform was built on the principles of classical data warehousing. The log files were collected from each of the log nodes and merged into one big file for each category. The contents were then aggregated. After that the results were used by the Pentaho Kettle ETL tool to populate the tables based on a star schema in PostgreSQL. At this point we could run SQL queries to use our data.
In March 2008 we needed to process and use 30 GB of daily log data per day. The processing times for our most important events were:
• 12 hours to summarize all daily log files for logged events
• 6 hours to create training data samples
• 2 days to create weekly reports
• 2 days to summarize data accessed via a customer web based interface
We were far from satisfied with these times. However, it was good enough in the early days and customer expectations of our service in 2008 were a lot lower than today.
Headache Turning Into a Migraine
One year later our situation had changed considerably. The graph below shows the amount of data we were logging from January 2009 until December 2009.
In March 2009 compared to the previous year we were logging more than double the amount of data per day as the online platform went from receiving 3200 to over 8500 requests per second. The good news was our business was growing nicely. Our online platform was straight-forward to scale in tandem, we just added more machines. The bad news was our data processing platform was not as simple to scale. In March 2009 the processing times for the same events mentioned early were:
• 23 hours to summarize all daily log files for all events (almost a day to process a day)
• 18 hours to create training data samples (inhibiting our ability to refresh data mining models regularly)
• 5 days to create weekly reports (our weekly reports were almost one week behind)
• 4 days to summarize data accessed via a customer web based interface (prone to crash)
We could no longer process our data in the time scales needed for our business. The root cause of our problem was clear. A significant increase in data volume resulted in a significant increase in processing times. Initially we thought maybe we could just improve our classical data warehouse. After analyzing our current solution and problem it was pretty clear that we could not avoid hitting scaling and performance problems.
Searching for Relief, We Found Hadoop
In March 2009 we started our investigation of possible technologies. As part of it we setup a Hadoop test cluster with three machines. A selection of log files was copied into the HDFS. We started writing simple Pig scripts and later MapReduce jobs using the standard Hadoop API. It took some practice to go beyond the classical word count example to writing solutions for our problems using MapReduce.
Since our initial tests looked promising, we decided to try and build our solution on Hadoop. Besides scaling there were several additional reasons:
• Relatively easy to administrate and monitor
• Easy to use (when you are a small team everyone needs to be able to change anything at anytime)
• No software licensing costs
• Only expansion cost is hardware
In June 2009 we started to develop a full solution. At this point certain external factors played a very helpful role in our development. One was the publication of the book “Hadoop: The Definitive Guide” by Tom White, which helped us to understand how we could use Hadoop. The other was the emergence of a dynamic programming language called Clojure, which compiles directly to JVM byte code.
After one month of development we were able to create our reports using Hadoop with our processing times going from five days to one hour. This was great for building confidence in our decision. For the following four months we progressively turned features off in the old data warehouse as they became available in Hadoop. By October 2009 we had completed the migration and also additional new features previously impossible to run. In the next section I will briefly explain how it actually works.
A Closer Look at our Hadoop Setup
Our cluster is located in one of our data centers and contains commodity machines with a total of 36 cores and 8 TB of disk space. The machines were provisioned using Chef from Opscode.com and we use the Cloudera CDH1 distribution.
The log files are now copied to the HDFS. In order to make sense of our data we need it summarized by hours, days and weeks. Therefore, we organize our HDFS directory structure hierarchically by date to reflect this requirement. The path for handling days and hours follows the structure /event/years/months/days/hours. This way we can use simple file globs for a MapReduce job input file configuration.
We wrote our own simple scheduler querying the HDFS to see, if input is available to create missing output. When it cannot find the output a configuration is created containing input and output path which is sent to our MapReduce server.
The MapReduce server provides a JSON HTTP API for starting, querying and stopping jobs. It supports both scheduled and on-demand jobs. When the server receives a request to run a job, the event name is used to locate the associated chain of one or more Hadoop jobs to run. A unique identifier is returned which can be later used to query or stop the job.
An example is the chain of events to run one of our daily reports. Customer-wise it contains a summary of page impressions and unique clients for each socio-demographic and product interest prediction our online platform produces. Therefore, we first fetch information stored in our customer database and add this to the distributed cache. The MapReduce phase sums up the page impressions for each user and counts the number of client ids for each prediction and possible outcome e.g. age class 30-39. The final phase is to perform a reduce side join where the internal customer ids are translated to account manager readable information by accessing the data previously stored in the distributed cache.
At a later point we intend to use the MapReduce server API to build a web based interface fitting our purposes.
Frequently-run jobs are implemented in Clojure and Java using the Hadoop MapReduce API with a number of performance optimizations. These are:
• Compress map output to LZO, mainly to reduce disk IO during the shuffle phase
• Apply a Combiner to perform initial aggregation before the data arrives at the reducer
• Use our own developed Writable types which are written to be RawComparators
• Add type hints with Clojure code to avoid the overhead of reflection
Also we use tools like Pig to run ad hoc reports as well as streaming jobs e.g. to grep the contents of the web servers logs.
Spreadsheets are often used by our statisticians and account managers as a tool to analyze data. Therefore, we wrote an OutputFormat class which generates Excel work books with a number of sheets summarizing customer data.
Older and a Little Wiser
The processing times for our most important events in December 2009 were:
• 42 minutes to summarize all daily log files for all events
• 1 hour to create training data samples
• 1 hour to create weekly reports
• 3 hours to summarize data accessed via a customer web based interface
Hadoop has really helped us to reduce dramatically the time taken to process data. We can expand both our online and data processing platform in the same way by simply adding more machines.
A recent interesting development in our market is to enable different customers to share their data with each other for variable time frames. Data shared by several willing customers involves finding and processing huge training sets for our prediction algorithms. If we had not migrated, we could have never made this possible.
A potential next step for us would be to use column-oriented stores with MapReduce integration. Some of the options in the Hadoop ecosystem include Zebra (Pig), RCFile (Hive), or HBase. If this proves to be successful I look forward to writing the follow-up post. Moving from one hour to one minute sounds good.
With its Predictive Behavioral Targeting solution nugg.ad operates Europe’s largest targeting platform. nugg.ad’s unique predictive algorithm reduces media loss, increases campaign efficiency and lowers target-group CPM. nugg.ad works with and assists its clients to increase turnover and win new advertising budgets as it delivers predicted values on socio-demographics, gender and product interests making it possible to target hard-to-reach target groups online.