This post is courtesy of Kumanan Rajamanikkam, Lead Engineer at Wordnik.
Wordnik’s Processing Challenge
At Wordnik, our goal is to build the most comprehensive, high-quality understanding of English text. We make our findings available through a robust REST api and www.wordnik.com. Our corpus grows quickly—up to 8,000 words per second. Performing deep lexical analysis on data at this rate is challenging to say the least.
We had major challenges with three distinct problems:
- Updating our Lucene search indexes at a rapid rate. Most of the time we’re appending data but frequently there are updates. Often new algorithms are put in place which require a complete rebuild of the index.
- Updating our runtime aggregate data based on our lexical analysis techniques.
- Allowing for ad-hoc queries against our corpus by our R&D team.
When speaking about the impact of Apache Hadoop on our analytical workflow, we have painful and fresh memories of tackling the problems without using a MapReduce cluster—effectively brute-forcing the solution with large servers. Rebuilding our search indexes took between 2 and 3 weeks. Generating aggregate statistics took between 3 days to 2 weeks. Running simple algorithms against our whole corpus would typically take 3-4 days, and that was when everything went exactly right.
We would split processing tasks across multiple machines and start jobs manually. This involved dividing our corpus (documents) into tasks and assigning jobs (ranges of IDs) to different servers. We would then monitor their progress and strive to keep all servers busy 24×7. Unfortunately writing our own task tracker system was more work than setting the alarm to get up at 2am to make sure jobs were still running. This was a tiresome process that wasn’t really good for anyone.
Our corpus is stored in MongoDB–this has worked out to be an excellent storage mechanism for Wordnik. When queried efficiently (aka the “right way”) we have been able to get huge amounts of data into our production in a very fast and predictable fashion. But R&D folks often need to issue some potentially hairy, unexpected queries, and that can potentially cause blocking scenarios in Mongo. Since we’re constantly appending data, we have to protect our runtime datastore from blocking queries. Thus, we simply cannot allow ad-hoc queries against our MongoDB instance. Without the benefit of being a super-rich startup, we can’t just provision servers and storage devices for these ad-hoc queries—especially when we have such a large storage requirement. Thus using resources in the best way possible is always tops on our list.
Done Right with Hadoop
We became tired of the babysitting and had a couple major deadlines which made multi-week-long processing tasks impossible. We provisioned a number of Rackspace VMs and commissioned Cloudera to help set up base images and management scripts. They then helped us migrate some of our analytic code into a MR-friendly format. With that, it didn’t take long to have a CDH3 Hadoop distribution image ready for provisioning to a cluster of VMs. Going into this, we were confident that the VMs would drastically underperform our physical servers in the datacenter, but the ability to turn them up as needed would outweigh the performance-per-dollar metric.
We then started the work on migrating our logic into Hadoop-friendly jobs. We had a couple choices for the source data:
- Write a custom DBInputFormat class which reads directly from our MongoDB cluster. Each Map task would be responsible for a range of IDs.
- Export the data and load it into HDFS
Given the need for ad-hoc analysis, we chose the latter route, which requires more storage but relieves our MongoDB instance from the dreaded long running, open cursors. This also made sense given the append-only nature of our data. Incremental updates to our data are stored in flat files, which periodically import into HDFS. We also implemented a simple changelog to track updates in data already transferred to HDFS so that updates can be propagated to the Hadoop cluster.
Our search indexes are Lucene indexes—they are created as RAM indexes in the mapper and emitted it as byte. These files are in the terabyte size—getting the MR jobs to execute reliably and efficiently required a couple of simple but critical tuning steps.
Our first tuning step was regarding RAM. It’s crucial to understand the memory requirements of the documents used in the tasks. For example, a blog post may contain 300 words while a typical book can range between 10 and 100,000. If mapper task splits on a count of documents, you’re looking at potentially huge discrepancies in RAM requirements between splits. You are then over or undershooting the processing capability of the node, and slowing down the process overall. In our use case we had to split our documents into smaller chunks of data (sentences) since the variations of sentence sizes are much more predictable. This made mapper tasks much more consistently sized which gave us better throughput.
Depending on the size of the map tasks, we ran into sporadic timeouts which required overriding the mapred.merge.recordsBeforeProgress configuration. Unfortunately that was typically not found until many hours into a job, so it’s worth watching early on.
The time to rebuild our entire index with a 10-node cluster on Rackspace VMs (16GB Ram) has been reduced from 3 weeks down to 3.5 days. We have seen a linear improvement in processing time with cluster growth which, thanks to the images and scripts from Cloudera, is now trivial to spin up. Our aggregate statistics can be regenerated in one day, down from weeks.
The speed of processing has enabled us to free up resources and allow the same cluster to be used for R&D analysis.
Hopefully there will be the day when we can have slotless MapReduce, which could allow variable map or reduce jobs. Ideally, we should be able to configure the maximum number of tasks that can be executed per node purely based on the number of cpus / cores. Then, it is up to the task tracker to decide how many tasks to allocate within that allowed range based on the memory requirements of each job and how much free memory is available in the node. This will give us maximum flexibility.
Since we’re working with large Lucene Indexes, the Katta project may be helpful with our Hadoop cluster.
Overall, Hadoop helped us greatly. We’re moving more logic to be Hadoop-enabled including using HBase for ad-hoc analytics. As with many others, Wordnik has left the one-size-fits-all relational world and is moving towards a split MongoDB document storage for runtime data and Hadoop for analytics, data processing, reporting, and time-based aggregation.