A Guide to Python Frameworks for Hadoop

I recently joined Cloudera after working in computational biology/genomics for close to a decade. My analytical work is primarily performed in Python, along with its fantastic scientific stack. It was quite jarring to find out that the Apache Hadoop ecosystem is primarily written in/for Java. So my first order of business was to investigate some of the options that exist for working with Hadoop from Python.

In this post, I will provide an unscientific, ad hoc review of my experiences with some of the Python frameworks that exist for working with Hadoop, including:

  • Hadoop Streaming
  • mrjob
  • dumbo
  • hadoopy
  • pydoop
  • and others

Ultimately, in my analysis, Hadoop Streaming is the fastest and most transparent option, and the best one for text processing. mrjob is best for rapidly working on Amazon EMR, but incurs a significant performance penalty. dumbo is convenient for more complex jobs (objects as keys; multistep MapReduce) without incurring as much overhead as mrjob, but it’s still slower than Streaming.

Read on for implementation details, performance comparisons, and feature comparisons.

Toy Problem Definition

To test out the different frameworks, we will not be doing “word count”. Instead, we will be transforming the Google Books Ngram data. An n-gram is a synonym for a tuple of n words. The n-gram data set provides counts for every single 1-, 2-, 3-, 4-, and 5-gram observed in the Google Books corpus grouped by year. Each row in the n-gram data set is composed of 3 fields: the n-gram, the year, and the number of observations. (You can explore the data interactively here.)

We would like to aggregate the data to count the number of times any pair of words are observed near each other, grouped by year. This would allow us to determine if any pair of words are statistically near each other more often than we would expect by chance. Two words are “near” if they are observed within 4 words of each other. Or equivalently, two words are near each other if they appear together in any 2-, 3-, 4-, or 5-gram. So a row in the resulting data set would be comprised of a 2-gram, a year, and a count.

There is one subtlety that must be addressed. The n-gram data set for each value of n is computed across the whole Google Books corpus. In principle, given the 5-gram data set, I could compute the 4-, 3-, and 2-gram data sets simply by aggregating over the correct n-grams. For example, if the 5-gram data set contains

 

then we could aggregate this into 2-grams which would result in records like

 

However, in practice, Google only includes an n-gram if it is observed more than 40 times across the entire corpus. So while a particular 5-gram may be too rare to meet the 40-occurrence threshold, the 2-grams it is composed of may be common enough to break the threshold in the Google-supplied 2-gram data. For this reason, we use the 2-gram data for words that are next to each other, the 3-gram data for pairs of words that are separated by one word, the 4-gram data for pairs of words that are separated by 2 words, etc. In other words, given the 2-gram data, the only additional information the 3-gram data provide are the outermost words of the 3-gram. In addition to being more sensitive to potentially rare n-grams, using only the outermost words of the n-grams helps ensure we avoid double counting. In total, we will be running our computation on the combination of 2-, 3-, 4-, and 5-gram data sets.

The MapReduce pseudocode to implement this solution would look like so:

 

Hardware

These MapReduce jobs are executed on a ~20 GB random subset of the data. The full data set is split across 1500 files; we select a random subset of the files using this script. The filenames remain intact, which is important because the filename identifies the value of n in the n-grams for that chunk of data.

The Hadoop cluster comprises five virtual nodes running CentOS 6.2 x64, each with 4 CPUs, 10 GB RAM, 100 GB disk, running CDH4. The cluster can execute 20 maps at a time, and each job is set to run with 10 reducers.

The software versions I worked with on the cluster were as follows:

  • Hadoop: 2.0.0-cdh4.1.2
  • Python: 2.6.6
  • mrjob: 0.4-dev
  • dumbo: 0.21.36
  • hadoopy: 0.6.0
  • pydoop: 0.7 (PyPI) and the latest version on git repository
  • Java: 1.6

Implementations

Most of the Python frameworks wrap Hadoop Streaming, while others wrap Hadoop Pipes or implement their own alternatives. Below, I will discuss my experience with a number of tools for using Python to write Hadoop jobs, along with a final comparison of performance and features. One of the features I am interested in is the ease of getting up and running, so I did not attempt to optimize the performance of the individual packages.

As with every large data set, there are bad records. We check for a few kinds of errors in each record including missing fields and wrong n-gram size. For the latter case, we must know the name of the file that is being processed in order to determine the expected n-gram size.

All the code is available in this GitHub repo.

Hadoop Streaming

Hadoop Streaming is the canonical way of supplying any executable to Hadoop as a mapper or reducer, including standard Unix tools or Python scripts. The executable must read from stdin and write to stdout using agreed-upon semantics. One of the disadvantages of using Streaming directly is that while the inputs to the reducer are grouped by key, they are still iterated over line-by-line, and the boundaries between keys must be detected by the user.

Here is the code for the mapper:

 

And here is the reducer:

 

Hadoop Streaming separates the key and value with a tab character by default. Because we also separate the fields of our key with tab characters, we must tell Hadoop that the first three fields are all part of the key by passing these options:

 

The command to execute the Hadoop job is

 

Note that the files mapper.py and reducer.py must be specified twice on the command line: the first time points Hadoop at the executables, while the second time tells Hadoop to distribute the executables around to all the nodes in the cluster.

Hadoop Streaming is clean and very obvious/precise about what is happening under the hood. In contrast, the Python frameworks all perform their own serialization/deserialization that can consume additional resources in a non-transparent way. Also, if there is a functioning Hadoop distribution, then Streaming should just work, without having to configure another framework on top of it. Finally, it’s trivial to send Unix commands and/or Java classes as mappers/reducers.

The disadvantage of Streaming is that everything must be done manually. The user must decide how to encode objects as keys/values (e.g., as JSON objects). Also, support for binary data is not trivial. And as mentioned above, the reducer must keep track of key boundaries manually, which can be prone to errors.

mrjob

mrjob is an open-source Python framework that wraps Hadoop Streaming and is actively developed by Yelp. Since Yelp operates entirely inside Amazon Web Services, mrjob’s integration with EMR is incredibly smooth and easy (using the boto package).

mrjob provides a pythonic API to work with Hadoop Streaming, and allows the user to work with any objects as keys and mappers. By default, these objects are serialized as JSON objects internally, but there is also support for pickled objects. There are no other binary I/O formats available out of the box, but there is a mechanism to implement a custom serializer.

Significantly, mrjob appears to be very actively developed, and has great documentation.

As with all the Python frameworks, the implementation looks like pseudocode:

 

mrjob is only required to be installed on the client node where the job is submitted. Here are the commands to run it:

 

Writing MapReduce jobs is incredibly intuitive and simple. However, there is a significant cost incurred by the internal serialization scheme. A binary scheme would most likely need to be implemented by the user (e.g., to support typedbytes). There are also some built-in utilities for log file parsing. Finally, mrjob allows the user to write multi-step MapReduce workflows, where intermediate output from one MapReduce job is automatically used as input into another MapReduce job.

(Note: The rest of the implementations are all highly similar, aside from package-specific implementation details. They can all be found here.)

dumbo

dumbo is another Python framework that wraps Hadoop Streaming. It seems to enjoy relatively broad usage, but is not developed as actively as mrjob at this point. It is one of the earlier Python Hadoop APIs, and is very mature. However, its documentation is lacking, which makes it a bit harder to use.

It performs serialization with typedbytes, which allows for more compact data transfer with Hadoop, and can natively read SequenceFiles or any other file type by specifying a Java InputFormat. In fact, dumbo enables the user to execute code from any Python egg or Java JAR file.

In my experience, I had to manually install dumbo on each node of my cluster for it to work. It only worked if typedbytes and dumbo were built as Python eggs. Finally, it failed to run with a combiner, as it would terminate on MemoryErrors.

The command to run the job with dumbo is

 

hadoopy

hadoopy is another Streaming wrapper that is compatible with dumbo. Similarly, it focuses on typedbytes serialization of data, and directly writes typedbytes to HDFS.

It has a nice debugging feature, in which it can directly write messages to stdout/stderr without disrupting the Streaming process. It feels similar to dumbo, but the documentation is better. The documentation also mentions experimental Apache HBase integration.

With hadoopy, there are two ways to launch jobs:

  1. launch requires Python/hadoopy to be installed on each node in the cluster, but has very little overhead after that.
  2. launch_frozen does not even require that Python is installed on the nodes, but it incurs a ~15 second penalty for PyInstaller to work. (It’s claimed that this can be somewhat mitigated by optimizations and caching tricks.)

Jobs in hadoopy must be launched from within a Python program. There is no built-in command line utility.

I launch hadoopy via the launch_frozen scheme using my own Python script:

 

After running it with launch_frozen, I installed hadoopy on all nodes and used the launchmethod instead. The performance was not significantly different.

pydoop

In contrast to the other frameworks, pydoop wraps Hadoop Pipes, which is a C++ API into Hadoop. The project claims that they can provide a richer interface with Hadoop and HDFS because of this, as well as better performance, but this is not clear to me. However, one advantage is the ability to implement a Python Partitioner, RecordReader, and RecordWriter. All input/output must be strings.

Most importantly, I could not successfully build pydoop via pip or directly from source.

Others

  • happy is a framework for writing Hadoop jobs through Jython, but seems to be dead.
  • Disco is a full-blown non-Hadoop reimplementation of MapReduce. Its core is written in Erlang, with the primary API in Python. It is developed at Nokia, but is much less used than Hadoop.
  • octopy is a reimplementation of MapReduce purely in Python in a single source file. It is not intended for “serious” computation.
  • Mortar is another option for working with Python that was just recently launched. Through a web app, the user can submit Apache Pig or Python jobs to manipulate data sitting in Amazon S3.
  • There are several higher-level interfaces into the Hadoop ecosystem, such as Apache Hive and Pig. Pig provides the facility to write user-defined-functions with Python, but it appears to run them through Jython. Hive also has a Python wrapper called hipy.
  • (Added Jan. 7 2013) Luigi is a Python framework for managing multistep batch job pipelines/workflows. It is probably a bit similar to Apache Oozie but it has some built-in functionality for wrapping Hadoop Streaming jobs (though it appears to be a light wrapper). Luigi has a nice feature of extracting out a Python traceback if your Python code crashes a job, and also has nice command-line features. It has a great introductory README file but seems to lack comprehensive reference documentation. Luigi is actively developed and used at Spotify for running many jobs there.

Native Java

Finally, I implemented the MR job using the new Hadoop Java API. After building it, I ran it like so:

 

A Note About Counters

In my initial implementations of these MR jobs, I used counters to keep track of the number of bad records. In Streaming, this requires writing messages to stderr. It turns out this incurs a significant overhead: the Streaming job took 3.4x longer than the native Java job. The frameworks were similarly penalized.

Performance Comparison

The MapReduce job was also implemented in Java as a baseline for performance. All values for the Python frameworks are ratios relative to the corresponding Java performance.

Java is obviously the fastest, with Streaming taking 50% longer, and the Python frameworks taking substantially longer still. From a profile of the mrjob mapper, it appears a substantial amount of time is spent in serialization/deserialization. The binary formats in dumbo and hadoopy may ameliorate the problem. The dumbo implementation may have been faster if the combiner was allowed to run.

Feature Comparison

Mostly gleaned from the respective packages’ documentation or code repositories.

Conclusions

Streaming appears to be the fastest Python solution, without any magic under the hood. However, it requires care when implementing the reducer, and also when working with more complex objects.

All the Python frameworks look like pseudocode, which is a huge plus.

mrjob seems highly active, easy-to-use, and mature. It makes multistep MapReduce flows easy, and can easily work with complex objects. It also works seamlessly with EMR. But it appears to perform the slowest.

The other Python frameworks appear to be somewhat less popular. Their main advantage appears to be built-in support for binary formats, but this is probably something that can be implemented by the user, if it matters.

So for the time being:

  • Prefer Hadoop Streaming if possible. It’s easy enough, as long as care is taken with the reducer.
  • Prefer mrjob to rapidly get on Amazon EMR, at the cost of significant computational overhead.
  • Prefer dumbo for more complex jobs that may include complex keys and multistep MapReduce workflows; it’s slower than Streaming but faster than mrjob.

If you have your own observations based on practice, or for that matter any errors to point out, please do so in comments.

Update (10/15/2014): See the presentation below for updates about this topic:

Uri Laserson is a data scientist at Cloudera.

Filed under:

33 Responses
  • Steve Johnson / January 07, 2013 / 11:25 AM

    I left a couple of comments and corrections over at the HN thread about this post. http://news.ycombinator.com/item?id=5022625

  • Luca Pireddu / January 07, 2013 / 11:29 AM

    Hi Uri,

    what sort of problems did you have installing Pydoop? You may be missing some dependencies (see http://pydoop.sourceforge.net/docs/installation.html).

    Interestingly, once Pydoop is installed, with ‘pydoop script’ you could almost run your pseudo code exactly as it is.

  • Uri Laserson / January 07, 2013 / 12:06 PM

    Luca, I believe I have all the required dependencies for pydoop. The pip output is here:
    https://gist.github.com/4477947

    And the output from building from source:
    https://gist.github.com/4477933

  • Luca Pireddu / January 07, 2013 / 1:34 PM

    Hmmm…you’re trying to build Pydoop on Mac OS X!

    That’s the first time we see anyone trying this. Mac OS X isn’t a supported Hadoop or Pydoop platform, hence the compilation issues.

    I looked at the setup output you posted. There seems to be some sort of incompatibility in boost-python. Unfortunately I don’t have a Mac to try debugging your problem.

    Nevertheless, I’ve searched the Internet for your error and it seems that you’re not the first one to encounter it. One project got around the problem simply reordering their #includes: https://groups.google.com/d/msg/pythonvision/eVu5I4vzDfw/pi894YBwEMgJ
    A stab in the dark could be to edit pydoop/src/pipes_context.hpp and reverse the order of the lines
    #include
    #include

    Another project had a more sophisticated solution: http://goo.gl/zCgTc [code.google.com]

    Do you want to try? If you manage to build it, we'd be happy to integrate your changes into Pydoop.

  • Luca Pireddu / January 07, 2013 / 1:37 PM

    Argg….I didn’t mean to be that vague with the “stab in the dark”. The blog engine ate my include lines.

    The include lines are numbers 27 and 28: hadoop/Pipes.hh and boost/python.hpp

  • Klaas / January 07, 2013 / 3:41 PM

    Nice post!

    For what it’s worth, dumbo is a lot faster when using ctypedbytes and the memory limit safeguards can easily be tweaked.

    I’d also be interested to know what things in particular you found to be missing in the docs. I don’t think we’ll have every little detail documented anytime soon, but maybe there’s some low hanging fruit that could be rectified with realistic effort…

  • Jim Blomo / January 07, 2013 / 4:52 PM

    Jim from Yelp here. Thanks for the wonderful write-up! For those interested in more information, we’re hosting the Hadoop meetup this Wednesday and talking about the future of mrjob: http://www.meetup.com/hadoopsf/events/95687012/

    Regarding serialization performance, Steve Johnson points out in the HN comments this is likely caused by the default Python JSON library. Switching to simplejson may speed things up significantly, and it is what we use at Yelp.

  • Ivan Savov / January 07, 2013 / 8:06 PM

    Thanks for the research and the sample code. This is something I will definitely have to play with.

    Is there any chance you could add a groovy version? I hear that it is almost as good as python (short and expressive) but it can use all the Java stuff natively.

  • TG / January 07, 2013 / 10:00 PM

    Hi,

    Terrific blog post! I ran the vanilla python streaming code on CDH4 and recieved this error:

    Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads

    Do you have any experience running streaming jobs on CDH4? Any idea what might be causing this? Thanks!

  • Justin Kestelyn (@kestelyn) / January 08, 2013 / 1:20 PM

    TG,

    I recommend that you post your question/error to the cdh-user@cloudera.org list.

  • Simone Leo / January 11, 2013 / 9:03 AM

    Hello,

    It is not quite clear to me why you tried building Pydoop on Mac OS X: isn’t your test environment made up of CentOS boxes? CentOS is an operating system we’ve supported right from the start.

    Anyway, we worked on Mac support and released a new version that builds on OS X Mountain Lion. Here is the link to the installation instructions:

    http://pydoop.sourceforge.net/docs/installation.html#osx

    This is how you write the n-gram program with pydoop script:

    def mapper(_, record, writer):
    record = record.split(“\t”, 3)
    ngram = record[0].split()
    writer.emit(“\t”.join(sorted((ngram[0], ngram[-1]))+[record[1]]), record[2])

    def reducer(key, ivalue, writer):
    writer.emit(key, sum(map(int, ivalue)))

    And this is how you run it (assuming you saved the code to a file named “ngram_aggregation.py”):

    pydoop script ngram_aggregation.py input output

    You can get the new release with a git pull. It would be great if you could update this survey with Pydoop data.

    Simone

  • Simone Leo / January 11, 2013 / 9:06 AM

    The blog engine removed all indentation from the code :( Everything except the def lines should be indented one level.

    Simone

  • Uri Laserson (@laserson) / January 11, 2013 / 11:35 AM

    Simone, I had the exact same errors on the CentOS machines. I only compiled it on my local machine to generate the errors to copy/paste.

  • Gianluigi Zanetti / January 11, 2013 / 2:55 PM

    Hi Uri.
    Could you please give us details on the errors you had on the CentOS machines? We have multiple installations running on configurations nominally similar to the one you tried, so it would be very interesting to know what went wrong in your case.

    By the way, as Simone was saying, pydoop 0.8 supports OS X Mountain Lion so it would be very nice if you could try a re-install.

  • Simone Leo / January 14, 2013 / 1:59 AM

    Yes, the CentOS build log would definitely help. By the way, we have an active help forum here:

    http://sourceforge.net/p/pydoop/discussion/990018

    Simone

  • Steve Kannan / January 17, 2013 / 5:31 PM

    Great post! I have actually been doing some similar research myself.

    Do you know if it’s possible to write and run a MapReduce using Jython and the standard Java Mapreduce API? It is frequently suggested as a viable option but I haven’t found anyone who has actually done it. As you point out, the happy project appears to be dead. Additionally, the word_count.py example included in most hadoop distributions does not seem to work. It relies on jythonc, which was deprecated years ago. I got the old version of Jython with jythonc and compiled it anyway, but the resultant jar would not run on the TaskTrackers.

    Others seem to have had the same experience (http://sourceforge.net/mailarchive/forum.php?forum_name=jython-users&max_rows=25&style=nested&viewmonth=200809).

    Is this an option you explored?
    Thanks

    • Uri Laserson (@laserson) / January 17, 2013 / 10:40 PM

      I believe that Pig lets you write Python UDFs that get run through Jython. I think requiring Jython is a huge roadblock for many people since they want to use their favorite Python modules that are probably not compatible (e.g., NumPy). Mortar data is a new player that makes it easy to write Pig scripts and Python to work on data in S3. According to their website, it should be possible to use all of the Python scientific stack (NumPy, SciPy, etc.).

  • Steve Kannan / January 18, 2013 / 10:12 AM

    Thanks for the info. I totally agree that Jython is limited in many respects. I was trying to test the basic Jython MRs for the sake of completeness, but I don’t think they are usable. Is it safe to assume that the included examples (WordCount.py and JythonAbacus.py) are deprecated and Jython (outside of Pig) is not supported?

  • Angelica Pando / March 11, 2013 / 8:52 PM

    FWIW, I also can’t build pydoop. Pretty unfortunate, since I really wanted to give it a try.

    Same error as Uri, trying to do pip install: https://gist.github.com/laserson/4477947

    And, like Uri, I’m *not* trying to install on Mac OS X

    $ cat /etc/redhat-release
    CentOS release 5.6 (Final)

    Also tried to post in the help forum mentioned above but I guess I can’t create any topics. Oh well.

  • Simone Leo / March 19, 2013 / 9:42 AM

    Hi Angelica,

    unfortunately the pip install issue has been around for a while, but it should be solved now. You still have to install all prerequisites manually as pip cannot handle them.

    Regarding your inability to post on the forum, I have no idea what happened. It’s been used by a lot of different users now. Have you tried contacting the Sourceforge tech support?

    Simone

  • Pythoner / May 02, 2013 / 8:02 AM

    what IDE do you use for python?

  • Matthew Cornell / October 16, 2013 / 5:58 AM

    Thanks very much for the article. FYI The Hadoop Streaming link http://hadoop.apache.org/docs/r0.15.2/streaming.html is broken. Here’s the most recent: http://hadoop.apache.org/docs/stable/streaming.html

  • Justin Kestelyn (@kestelyn) / October 16, 2013 / 8:10 AM

    Matthew,

    Thanks for pointing that out, corrected.

  • AJ Rader / November 25, 2013 / 9:14 AM

    I liked this writeup when I first saw it and have come back to try to get mrjob working. I keep getting this error however:
    IOError: Could not check path hdfs:///test/pg5000.txt
    Any suggestions on what could be causing this?
    thanks,

    • Uri Laserson (@laserson) / November 25, 2013 / 1:26 PM

      Hi AJ,

      Glad the post is helpful. I looked briefly at the mrjob code, and seems that the call to invoke_hadoop is raising a CalledProcessError, which leads to your error. I imagine this could happen if you can’t successfully run hadoop fs -ls in the context that the mrjob code is running. Or perhaps your hadoop configuration files (e.g., site.xml or something like that) are not configured to correctly point to the cluster. Perhaps it would help to explicitly add the hostname of the hadoop cluster to the hdfs:// path? Hope this helps! Also, if you have followup questions, would you mind moving this to one of the user mailing lists? (either mrjob’s mailing list or github issue tracker, or hadoop-users or cdh-users)? Thanks! –Uri

  • Kevin Davenport (@KevinLDavenport) / December 21, 2013 / 4:38 PM

    Great post, thanks for sharing. I look forward to trying to implement sci-kit learn in one of these frameworks.

    • Uri Laserson (@laserson) / December 22, 2013 / 11:59 AM

      Thanks, Kevin! Please keep us posted on any progress. Also, if you’re not familiar with it, you should check out impyla and PySpark as well.

Leave a comment


− six = 2