A Guide to Python Frameworks for Hadoop
I recently joined Cloudera after working in computational biology/genomics for close to a decade. My analytical work is primarily performed in Python, along with its fantastic scientific stack. It was quite jarring to find out that the Apache Hadoop ecosystem is primarily written in/for Java. So my first order of business was to investigate some of the options that exist for working with Hadoop from Python.
In this post, I will provide an unscientific, ad hoc review of my experiences with some of the Python frameworks that exist for working with Hadoop, including:
- Hadoop Streaming
- and others
Ultimately, in my analysis, Hadoop Streaming is the fastest and most transparent option, and the best one for text processing. mrjob is best for rapidly working on Amazon EMR, but incurs a significant performance penalty. dumbo is convenient for more complex jobs (objects as keys; multistep MapReduce) without incurring as much overhead as mrjob, but it’s still slower than Streaming.
Read on for implementation details, performance comparisons, and feature comparisons.
Toy Problem Definition
To test out the different frameworks, we will not be doing “word count”. Instead, we will be transforming the Google Books Ngram data. An n-gram is a synonym for a tuple of n words. The n-gram data set provides counts for every single 1-, 2-, 3-, 4-, and 5-gram observed in the Google Books corpus grouped by year. Each row in the n-gram data set is composed of 3 fields: the n-gram, the year, and the number of observations. (You can explore the data interactively here.)
We would like to aggregate the data to count the number of times any pair of words are observed near each other, grouped by year. This would allow us to determine if any pair of words are statistically near each other more often than we would expect by chance. Two words are “near” if they are observed within 4 words of each other. Or equivalently, two words are near each other if they appear together in any 2-, 3-, 4-, or 5-gram. So a row in the resulting data set would be comprised of a 2-gram, a year, and a count.
There is one subtlety that must be addressed. The n-gram data set for each value of n is computed across the whole Google Books corpus. In principle, given the 5-gram data set, I could compute the 4-, 3-, and 2-gram data sets simply by aggregating over the correct n-grams. For example, if the 5-gram data set contains
(the, cat, in, the, hat) 1999 20 (the, cat, is, on, youtube) 1999 13 (how, are, you, doing, today) 1986 5000
then we could aggregate this into 2-grams which would result in records like
(the, cat) 1999 33 // i.e., 20 + 13
However, in practice, Google only includes an n-gram if it is observed more than 40 times across the entire corpus. So while a particular 5-gram may be too rare to meet the 40-occurrence threshold, the 2-grams it is composed of may be common enough to break the threshold in the Google-supplied 2-gram data. For this reason, we use the 2-gram data for words that are next to each other, the 3-gram data for pairs of words that are separated by one word, the 4-gram data for pairs of words that are separated by 2 words, etc. In other words, given the 2-gram data, the only additional information the 3-gram data provide are the outermost words of the 3-gram. In addition to being more sensitive to potentially rare n-grams, using only the outermost words of the n-grams helps ensure we avoid double counting. In total, we will be running our computation on the combination of 2-, 3-, 4-, and 5-gram data sets.
The MapReduce pseudocode to implement this solution would look like so:
def map(record): (ngram, year, count) = unpack(record) // ensure word1 has the lexicographically first word: (word1, word2) = sorted(ngram[first], ngram[last]) key = (word1, word2, year) emit(key, count) def reduce(key, values): emit(key, sum(values))
These MapReduce jobs are executed on a ~20 GB random subset of the data. The full data set is split across 1500 files; we select a random subset of the files using this script. The filenames remain intact, which is important because the filename identifies the value of n in the n-grams for that chunk of data.
The Hadoop cluster comprises five virtual nodes running CentOS 6.2 x64, each with 4 CPUs, 10 GB RAM, 100 GB disk, running CDH4. The cluster can execute 20 maps at a time, and each job is set to run with 10 reducers.
The software versions I worked with on the cluster were as follows:
- Hadoop: 2.0.0-cdh4.1.2
- Python: 2.6.6
- mrjob: 0.4-dev
- dumbo: 0.21.36
- hadoopy: 0.6.0
- pydoop: 0.7 (PyPI) and the latest version on git repository
- Java: 1.6
Most of the Python frameworks wrap Hadoop Streaming, while others wrap Hadoop Pipes or implement their own alternatives. Below, I will discuss my experience with a number of tools for using Python to write Hadoop jobs, along with a final comparison of performance and features. One of the features I am interested in is the ease of getting up and running, so I did not attempt to optimize the performance of the individual packages.
As with every large data set, there are bad records. We check for a few kinds of errors in each record including missing fields and wrong n-gram size. For the latter case, we must know the name of the file that is being processed in order to determine the expected n-gram size.
All the code is available in this GitHub repo.
Hadoop Streaming is the canonical way of supplying any executable to Hadoop as a mapper or reducer, including standard Unix tools or Python scripts. The executable must read from
stdin and write to
stdout using agreed-upon semantics. One of the disadvantages of using Streaming directly is that while the inputs to the reducer are grouped by key, they are still iterated over line-by-line, and the boundaries between keys must be detected by the user.
Here is the code for the mapper:
#! /usr/bin/env python import os import re import sys # determine value of n in the current block of ngrams by parsing the filename input_file = os.environ['map_input_file'] expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))) for line in sys.stdin: data = line.split('\t') # perform some error checking if len(data) < 3: continue # unpack data ngram = data.split() year = data count = data # more error checking if len(ngram) != expected_tokens: continue # build key and emit pair = sorted([ngram, ngram[expected_tokens - 1]]) print >>sys.stdout, "%s\t%s\t%s\t%s" % (pair, pair, year, count)
And here is the reducer:
#! /usr/bin/env python import sys total = 0 prev_key = False for line in sys.stdin: data = line.split('\t') curr_key = '\t'.join(data[:3]) count = int(data) # found a boundary; emit current sum if prev_key and curr_key != prev_key: print >>sys.stdout, "%s\t%i" % (prev_key, total) prev_key = curr_key total = count # same key; accumulate sum else: prev_key = curr_key total += count # emit last key if prev_key: print >>sys.stdout, "%s\t%i" % (prev_key, total)
Hadoop Streaming separates the key and value with a tab character by default. Because we also separate the fields of our key with tab characters, we must tell Hadoop that the first three fields are all part of the key by passing these options:
-jobconf stream.num.map.output.key.fields=3 -jobconf stream.num.reduce.output.key.fields=3
The command to execute the Hadoop job is
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.2.jar \ -input /ngrams \ -output /output-streaming \ -mapper mapper.py \ -combiner reducer.py \ -reducer reducer.py \ -jobconf stream.num.map.output.key.fields=3 \ -jobconf stream.num.reduce.output.key.fields=3 \ -jobconf mapred.reduce.tasks=10 \ -file mapper.py \ -file reducer.py
Note that the files
reducer.py must be specified twice on the command line: the first time points Hadoop at the executables, while the second time tells Hadoop to distribute the executables around to all the nodes in the cluster.
Hadoop Streaming is clean and very obvious/precise about what is happening under the hood. In contrast, the Python frameworks all perform their own serialization/deserialization that can consume additional resources in a non-transparent way. Also, if there is a functioning Hadoop distribution, then Streaming should just work, without having to configure another framework on top of it. Finally, it’s trivial to send Unix commands and/or Java classes as mappers/reducers.
The disadvantage of Streaming is that everything must be done manually. The user must decide how to encode objects as keys/values (e.g., as JSON objects). Also, support for binary data is not trivial. And as mentioned above, the reducer must keep track of key boundaries manually, which can be prone to errors.
mrjob is an open-source Python framework that wraps Hadoop Streaming and is actively developed by Yelp. Since Yelp operates entirely inside Amazon Web Services, mrjob’s integration with EMR is incredibly smooth and easy (using the boto package).
mrjob provides a pythonic API to work with Hadoop Streaming, and allows the user to work with any objects as keys and mappers. By default, these objects are serialized as JSON objects internally, but there is also support for pickled objects. There are no other binary I/O formats available out of the box, but there is a mechanism to implement a custom serializer.
Significantly, mrjob appears to be very actively developed, and has great documentation.
As with all the Python frameworks, the implementation looks like pseudocode:
#! /usr/bin/env python import os import re from mrjob.job import MRJob from mrjob.protocol import RawProtocol, ReprProtocol class NgramNeighbors(MRJob): # mrjob allows you to specify input/intermediate/output serialization # default output protocol is JSON; here we set it to text OUTPUT_PROTOCOL = RawProtocol def mapper_init(self): # determine value of n in the current block of ngrams by parsing filename input_file = os.environ['map_input_file'] self.expected_tokens = int(re.findall(r'([\d]+)gram', os.path.basename(input_file))) def mapper(self, key, line): data = line.split('\t') # error checking if len(data) < 3: return # unpack data ngram = data.split() year = data count = int(data) # more error checking if len(ngram) != self.expected_tokens: return # generate key pair = sorted([ngram, ngram[self.expected_tokens - 1]]) k = pair + [year] # note that the key is an object (a list in this case) # that mrjob will serialize as JSON text yield (k, count) def combiner(self, key, counts): # the combiner must be separate from the reducer because the input # and output must both be JSON yield (key, sum(counts)) def reducer(self, key, counts): # the final output is encoded as text yield "%s\t%s\t%s" % tuple(key), str(sum(counts)) if __name__ == '__main__': # sets up a runner, based on command line options NgramNeighbors.run()
mrjob is only required to be installed on the client node where the job is submitted. Here are the commands to run it:
export HADOOP_HOME="/usr/lib/hadoop-0.20-mapreduce" ./ngrams.py -r hadoop --hadoop-bin /usr/bin/hadoop --jobconf mapred.reduce.tasks=10 -o hdfs:///output-mrjob hdfs:///ngrams
Writing MapReduce jobs is incredibly intuitive and simple. However, there is a significant cost incurred by the internal serialization scheme. A binary scheme would most likely need to be implemented by the user (e.g., to support typedbytes). There are also some built-in utilities for log file parsing. Finally, mrjob allows the user to write multi-step MapReduce workflows, where intermediate output from one MapReduce job is automatically used as input into another MapReduce job.
(Note: The rest of the implementations are all highly similar, aside from package-specific implementation details. They can all be found here.)
dumbo is another Python framework that wraps Hadoop Streaming. It seems to enjoy relatively broad usage, but is not developed as actively as mrjob at this point. It is one of the earlier Python Hadoop APIs, and is very mature. However, its documentation is lacking, which makes it a bit harder to use.
It performs serialization with typedbytes, which allows for more compact data transfer with Hadoop, and can natively read SequenceFiles or any other file type by specifying a Java
InputFormat. In fact, dumbo enables the user to execute code from any Python egg or Java JAR file.
In my experience, I had to manually install dumbo on each node of my cluster for it to work. It only worked if typedbytes and dumbo were built as Python eggs. Finally, it failed to run with a combiner, as it would terminate on
The command to run the job with dumbo is
dumbo start ngrams.py \ -hadoop /usr \ -hadooplib /usr/lib/hadoop-0.20-mapreduce/contrib/streaming \ -numreducetasks 10 \ -input hdfs:///ngrams \ -output hdfs:///output-dumbo \ -outputformat text \ -inputformat text
hadoopy is another Streaming wrapper that is compatible with dumbo. Similarly, it focuses on typedbytes serialization of data, and directly writes typedbytes to HDFS.
It has a nice debugging feature, in which it can directly write messages to
stderr without disrupting the Streaming process. It feels similar to dumbo, but the documentation is better. The documentation also mentions experimental Apache HBase integration.
With hadoopy, there are two ways to launch jobs:
launchrequires Python/hadoopy to be installed on each node in the cluster, but has very little overhead after that.
launch_frozendoes not even require that Python is installed on the nodes, but it incurs a ~15 second penalty for PyInstaller to work. (It’s claimed that this can be somewhat mitigated by optimizations and caching tricks.)
Jobs in hadoopy must be launched from within a Python program. There is no built-in command line utility.
I launch hadoopy via the
launch_frozen scheme using my own Python script:
After running it with
launch_frozen, I installed hadoopy on all nodes and used the
launchmethod instead. The performance was not significantly different.
In contrast to the other frameworks, pydoop wraps Hadoop Pipes, which is a C++ API into Hadoop. The project claims that they can provide a richer interface with Hadoop and HDFS because of this, as well as better performance, but this is not clear to me. However, one advantage is the ability to implement a Python
RecordWriter. All input/output must be strings.
Most importantly, I could not successfully build pydoop via pip or directly from source.
- happy is a framework for writing Hadoop jobs through Jython, but seems to be dead.
- Disco is a full-blown non-Hadoop reimplementation of MapReduce. Its core is written in Erlang, with the primary API in Python. It is developed at Nokia, but is much less used than Hadoop.
- octopy is a reimplementation of MapReduce purely in Python in a single source file. It is not intended for “serious” computation.
- Mortar is another option for working with Python that was just recently launched. Through a web app, the user can submit Apache Pig or Python jobs to manipulate data sitting in Amazon S3.
- There are several higher-level interfaces into the Hadoop ecosystem, such as Apache Hive and Pig. Pig provides the facility to write user-defined-functions with Python, but it appears to run them through Jython. Hive also has a Python wrapper called hipy.
- (Added Jan. 7 2013) Luigi is a Python framework for managing multistep batch job pipelines/workflows. It is probably a bit similar to Apache Oozie but it has some built-in functionality for wrapping Hadoop Streaming jobs (though it appears to be a light wrapper). Luigi has a nice feature of extracting out a Python traceback if your Python code crashes a job, and also has nice command-line features. It has a great introductory README file but seems to lack comprehensive reference documentation. Luigi is actively developed and used at Spotify for running many jobs there.
Finally, I implemented the MR job using the new Hadoop Java API. After building it, I ran it like so:
hadoop jar /root/ngrams/native/target/NgramsComparison-0.0.1-SNAPSHOT.jar NgramsDriver hdfs:///ngrams hdfs:///output-native
A Note About Counters
In my initial implementations of these MR jobs, I used counters to keep track of the number of bad records. In Streaming, this requires writing messages to
stderr. It turns out this incurs a significant overhead: the Streaming job took 3.4x longer than the native Java job. The frameworks were similarly penalized.
The MapReduce job was also implemented in Java as a baseline for performance. All values for the Python frameworks are ratios relative to the corresponding Java performance.
Java is obviously the fastest, with Streaming taking 50% longer, and the Python frameworks taking substantially longer still. From a profile of the mrjob mapper, it appears a substantial amount of time is spent in serialization/deserialization. The binary formats in dumbo and hadoopy may ameliorate the problem. The dumbo implementation may have been faster if the combiner was allowed to run.
Mostly gleaned from the respective packages’ documentation or code repositories.
Streaming appears to be the fastest Python solution, without any magic under the hood. However, it requires care when implementing the reducer, and also when working with more complex objects.
All the Python frameworks look like pseudocode, which is a huge plus.
mrjob seems highly active, easy-to-use, and mature. It makes multistep MapReduce flows easy, and can easily work with complex objects. It also works seamlessly with EMR. But it appears to perform the slowest.
The other Python frameworks appear to be somewhat less popular. Their main advantage appears to be built-in support for binary formats, but this is probably something that can be implemented by the user, if it matters.
So for the time being:
- Prefer Hadoop Streaming if possible. It’s easy enough, as long as care is taken with the reducer.
- Prefer mrjob to rapidly get on Amazon EMR, at the cost of significant computational overhead.
- Prefer dumbo for more complex jobs that may include complex keys and multistep MapReduce workflows; it’s slower than Streaming but faster than mrjob.
If you have your own observations based on practice, or for that matter any errors to point out, please do so in comments.
Uri Laserson is a data scientist at Cloudera.