Hadoop at Twitter (part 1): Splittable LZO Compression

This summer I sent the following tweet, “Had lunch today at Twitter HQ. Thanks for the invite, @kevinweil! Great lunch conversation. Smart, friendly and fun team.” Kevin Weil leads the analytics team at Twitter and is an active member of the Hadoop community, and his colleague Eric Maland leads Operations.  Needless to say, Twitter is doing amazing things with Hadoop.  This guest blog from Kevin and Eric covers one of Twitter’s open-source projects which provides a solution for splittable LZO for Hadoop. – Matt

At Twitter we are significantly ramping up usage of Hadoop to help us analyze the massive amounts of data that our platform generates each day.  We are happy users of Cloudera’s free distribution of Hadoop; we’re currently running Hadoop 0.20.1 with Pig 0.4.  In this first of a small series of posts about our architecture and the open source software we’re working on around it, we’d like to focus on an infrastructure-level solution we use to make our cluster more efficient: splittable LZO for Hadoop.  Using LZO compression in Hadoop allows for reduced data size and shorter disk read times, and LZO’s block-based structure allows it to be split into chunks for parallel processing in Hadoop.  Taken together, these characteristics make LZO an excellent compression format to use in your cluster.

Splittable LZO and Hadoop

The original splittable LZO work was done by Johan Oskarsson, formerly of Last.fm and soon to be joining us at Twitter (welcome, Johan!).  Chris Goffinet also wrote a blog post on the topic for Cloudera a while back, but many people we’ve talked to don’t understand how the technique works, or why it’s effective.  It’s worth mentioning that there are three different versions of the LZO code for Hadoop floating around: the original Google Code project, Chris Goffinet’s version, and our version.  Chris’s version is a backport of the Google Code project to Hadoop 0.18.3 with a couple of fixes.  Our version works with Hadoop 0.20.1, has Chris’s fixes, and has other fixes as well from using the code at scale.  Our repository also has InputFormats for both the new and the old MapReduce API, since you generally want to write jobs against the new API, but Hadoop streaming still requires the old API.  Finally, we have refactored some of the core code to separate static inner classes into their own objects for better reuse.  This code is currently running 24 hours a day at Twitter, and we look forward to working with Owen O’Malley and Arun Murthy to get it integrated back into the main Google Code project soon.

Let’s back up one step and talk briefly about compression and Hadoop.  Storing compressed data in HDFS allows your hardware allocation to go further since compressed data is often 25% of the size of the original data.  Furthermore, since MapReduce jobs are nearly always IO-bound, storing compressed data means there is less overall IO to do, meaning jobs run faster.  There are two caveats to this, however: some compression formats cannot be split for parallel processing, and others are slow enough at decompression that jobs become CPU-bound, eliminating your gains on IO.  The gzip compression format illustrates the first caveat, and to understand why we need to go back to how Hadoop’s input splits work.  Imagine you have a 1.1 GB gzip file, and your cluster has a 128 MB block size.  This file will be split into 9 chunks of size approximately 128 MB.  In order to process these in parallel in a MapReduce job, a different mapper will be responsible for each chunk. But this means that the second mapper will start on an arbitrary byte about 128MB into the file.  The contextful dictionary that gzip uses to decompress input will be empty at this point, which means the gzip decompressor will not be able to correctly interpret the bytes.  The upshot is that large gzip files in Hadoop need to be processed by a single mapper, which defeats the purpose of parallelism.  For an example of the second caveat in which jobs become CPU-bound, we can look to the bzip2 compression format.  Bzip2 files compress well and are even splittable, but the decompression algorithm is slow and cannot keep up with the streaming disk reads that are common in Hadoop jobs.  While Bzip2 compression has some upside because it conserves storage space, running jobs now spend their time waiting on the CPU to finish decompressing data, which slows them down and offsets the other gains.

So is there something that balances these two extremes and solves both problems at once?  As you’ve probably guessed, the answer is LZO.  The LZO compression format is composed of many smaller (~256K) blocks of compressed data, allowing jobs to be split along block boundaries.  Moreover, it was designed with speed in mind: it decompresses about twice as fast as gzip, meaning it’s fast enough to keep up with hard drive read speeds.  It doesn’t compress quite as well as gzip — expect files that are on the order of 50% larger than their gzipped version.  But that is still 20-50% of the size of the files without any compression at all, which means that IO-bound jobs complete the map phase about four times faster.  Here’s a typical example, starting with an 8.0 GB file containing some text-based log data:

Compression File Size (GB) Compression Time (s) Decompression Time (s)
None some_logs 8.0 - -
Gzip some_logs.gz 1.3 241 72
LZO some_logs.lzo 2.0 55 35

As you can see, the LZO file is slightly larger than the corresponding gzip file, but both are much smaller than the original uncompressed file.  Additionally, the LZO file compressed nearly five times faster, and decompressed over two times faster.

We mentioned above that LZO files can be split as long as the splits occur on block boundaries, so how can we ensure that this occurs?  If we go back to our hypothetical 1.1 GB file, assuming now that it’s LZO compressed instead, the mapper that gets the second 128 MB chunk needs to be able to identify the beginning of the next LZO block boundary to start decompressing.  LZO does not not write any magic bytes in its block header, so a priori there is no way to identify the next header.  The solution is to perform a one-time indexing of the LZO file, writing a foo.lzo.index file for each foo.lzo file.  The index file simply contains the list of byte offsets for each block, and it’s fast to write because the indexing process is mostly seeking and thus moves at hard drive read speed.  Typically we see 90-100 MB/s indexing speed, i.e. 10-12 seconds per GB.  Once the index file has been created, any LZO-based input format can split compressed data by first loading the index, and then nudging the default input splits forward to the next block boundaries.  With these nudged splits, each mapper gets an input split that is aligned to block boundaries, meaning it can more or less just wrap its InputStream in an LzopInputStream and be done.  If you currently have a job that relies on TextInputFormat, for example, you can LZO-compress your data, make sure it’s indexed, rename TextInputFormat to LzoTextInputFormat, and your job will run just like before, only likely faster.  Incidentally, it’s worth noting that unindexed LZO files will still work with MapReduce jobs, they just won’t be splittable.

Setting up LZO with Hadoop

The steps to setting up LZO with Hadoop are simple.

  1. Get the lzop native libraries.
    Installing on Mac
    sudo port install lzop lzo2

    Installing on Redhat based systems
    sudo yum install liblzo-devel

    Installing on Debian based systems
    sudo apt-get install liblzo2-dev
  2. Clone our github repo and build it according to the instructions in the README.
  3. Place the hadoop-lzo-*.jar somewhere on your cluster nodes; we use /usr/local/hadoop/lib
  4. Place the native hadoop-lzo binaries (which are JNI-based and used to interface with the lzo library directly) on your cluster as well; we use /usr/local/hadoop/lib/native/<arch>/
  5. Add the following to your core-site.xml:
    <property>
    <name>io.compression.codecs</name>
    <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec</value>
    </property>
    <property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
    </property>
  6. Add the following property to mapred-site.xml.  We should note that the Cloudera guys backported the patch implementing this into their distribution for us to make configuration easier (thanks!).
    <property>
      <name>mapred.child.env</name>
      <value>JAVA_LIBRARY_PATH=/path/to/your/native/hadoop-lzo/libs</value>
    </property>
  7. If you would like to use LZO to compress map outputs as well, add the following to your mapred-site.xml.  This setting handles how and if the map output data, which must get sent over the network and then written to disk on the node running the reduce, is compressed.  Since this is an IO-heavy operation, it is another area where LZO compression can make your job significantly faster.
    <property>
      <name>mapred.map.output.compression.codec</name>
      <value>com.hadoop.compression.lzo.LzoCodec</value>
    </property>

That’s it!  Now upload a bunch of LZO files to your cluster; imagine you put them in /lzo_logs.  Make sure that the hadoop jars are in your classpath (we export the directory, /usr/local/hadoop/lib, as part of HADOOP_CLASSPATH in hadoop-env.sh) and take any lzo file on your cluster.  Run the following command to index your log files:

$ hadoop jar /path/to/hadoop-lzo.jar com.hadoop.compression.lzo.LzoIndexer /lzo_logs

Take any job you used to have running over those files, say a WordCount, and run it just like before but using the hadoop-lzo LzoTextInputFormat in place of the TextInputFormat.  Voila, splittable LZO!

All of our raw data at Twitter is stored in an LZO-compressed, splittable format.  Since MapReduce jobs are generally IO-bound, storing data in compressed formats yields significantly faster computations.  With splittable LZO, you can achieve the full parallelism offered by the MapReduce architecture in addition to saving on disk space.  We hope the explanation and code help you to test out splittable LZO compression in your own Hadoop cluster!

In a followup to this post, we’ll examine the structure of Hadoop InputFormats and Pig LoadFuncs for use with LZO-compressed data. For more about the kinds of analyses we do with Hadoop and Pig at Twitter, please see the slides from my recent talk at NoSQL East.  If this sounds like interesting work, Twitter is hiring engineers in Analytics, Infrastructure, Frontend, App Services, and Operations, and we’d love to talk to you.

Filed under:

16 Responses
  • Jay / November 17, 2009 / 6:14 PM

    Nice work!

    I guess just using zip format will do. Since all the indexing stuffs are at the end of zip file, custom zip file input format can be made to split the file contents accordingly. Each mapper can access the tail of the zip file and then access the region assigned for the map tasks.

  • Vladimir Rodionov / November 20, 2009 / 2:24 PM

    Can you give us some numbers? What is the performance gain?

  • Maciej / November 25, 2009 / 10:22 AM

    Great. You start with 2 compression algorithms that are worthless here. Then claim that there’s a 3rd one and the fact that it beats them makes it great.
    But really, if you’re going to compress log files and 256MB block is fine, LZO is a very poor option unless you *really* need *all* it’s speed.

    Something like Tornado (see: FreeArc) with 256MB block would be much stronger than anything you mention here and faster than gzip.
    And would let you split the job.

    Oh, and it’s untrue that gzip decompression can’t be splitted. Just like with any compressor, it can. You just need a right container which will split the files before compression. FreeArc can do it, but really making a home-grown solution wouldn’t be hard at all.

  • schubertzhang / March 11, 2010 / 8:08 PM

    When using such spliting method, how do you deal with the record/row boundary?

    A record/row may be truncated into two adjacent compression blocks. If the tow adjacent blocks are logically splited into different Mappers, it is difficult to determine the incomplete record/row and combine the to parts.

  • Lekhnath / July 07, 2010 / 5:34 AM

    Great work! Will love to test it

  • kailashbuki / December 20, 2010 / 10:28 PM

    Thanks for sharing your ideas!!!

  • Ravi / January 03, 2011 / 3:46 AM

    How do we test it whether LZO compression/decompression is correctly working or not.

Leave a comment


× two = 14