Introducing Crunch: Easy MapReduce Pipelines for Apache Hadoop

As a data scientist at Cloudera, I work with customers across a wide range of industries that use Apache Hadoop to solve their business problems. Many of the solutions we create involve multi-stage pipelines of MapReduce jobs that join, clean, aggregate, and analyze enormous amounts of data. When working with log files or relational database tables, we use high-level tools like Apache Pig and Apache Hive for their convenient and powerful support for creating pipelines over structured and semi-structured records.

As Hadoop has spread from web companies to other industries, the variety of data that is stored in HDFS has expanded dramatically. Hadoop clusters are being used to process satellite images, time series data, audio files, and seismograms. These formats are not a natural fit for the data schemas imposed by Pig and Hive, in the same way that structured binary data in a relational database can be a bit awkward to work with. For these use cases, we either end up writing large, custom libraries of user-defined functions in Pig or Hive, or simply give up on our high-level tools and go back to writing MapReduces in Java. Either of these options is a serious drain on developer productivity.

Today, we’re pleased to introduce Crunch, a Java library that aims to make writing, testing, and running MapReduce pipelines easy, efficient, and even fun. Crunch’s design is modeled after Google’s FlumeJava, focusing on a small set of simple primitive operations and lightweight user-defined functions that can be combined to create complex, multi-stage pipelines. At runtime, Crunch compiles the pipeline into a sequence of MapReduce jobs and manages their execution.

Example

Let’s take a look at the classic WordCount MapReduce, written using Crunch:

import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.type.writable.Writables;

public class WordCount {
  public static void main(String[] args) throws Exception {
    // Create an object to coordinate pipeline creation and execution.
    Pipeline pipeline = new MRPipeline(WordCount.class);
    // Reference a given text file as a collection of Strings.
    PCollection<String> lines = pipeline.readTextFile(args[0]);

    // Define a function that splits each line in a PCollection of Strings into a
    // PCollection made up of the individual words in the file.
    PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
      public void process(String line, Emitter<String> emitter) {
        for (String word : line.split("\\s+")) {
          emitter.emit(word);
        }
      }
    }, Writables.strings()); // Indicates the serialization format

    // The Aggregate.count method applies a series of Crunch primitives and returns
    // a map of the unique words in the input PCollection to their counts.
    // Best of all, the count() function doesn't need to know anything about
    // the kind of data stored in the input PCollection.
    PTable<String, Long> counts = Aggregate.count(words);

    // Instruct the pipeline to write the resulting counts to a text file.
    pipeline.writeTextFile(counts, args[1]);
    // Execute the pipeline as a MapReduce.
    pipeline.done();
  }
}

 

Advantages

  1. It’s just Java. Crunch shares a core philosophical belief with Google’s FlumeJava: novelty is the enemy of adoption. For developers, learning a Java library requires much less up-front investment than learning a new programming language. Crunch provides full access to the power of Java for writing functions, managing pipeline execution, and dynamically constructing new pipelines, obviating the need to switch back and forth between a data flow language and a real programming language.
  2. Natural type system. Crunch supports reading and writing data that is stored using Hadoop’s Writable format or Apache Avro records. You do not need to write code that maps data stored in these formats into Crunch’s type system– they are supported natively. You can even mix and match Writable and Avro types within a single MapReduce: changing the Writables.strings() call to Avros.strings() in the WordCount example will run the MapReduce using Avro serialization instead of Writables.
  3. A modular library released under the Apache License. Experts in machine learning, text mining, and ETL can craft libraries using Crunch’s data model, and other developers can use those libraries to build custom pipelines that operate on their data. For example, Crunch can be used to create the glue code that converts raw data into the structured input that a machine learning algorithm expects, and Crunch will compile the glue code and the machine learning algorithm into a single MapReduce.

Future Work

We are releasing Crunch as a development project, not a product. We’re eager for developers to play with it and tell us what they like and what they dislike. You can get started with Crunch by downloading it from Cloudera’s github repository here.

We have tested the library on a number of our use cases, but there will be bugs and rough edges that we will work out in the coming months. We gladly welcome contributions from the Hadoop ecosystem to help us improve Crunch as we prepare it for submission to the Apache Incubator, especially around:

  • More efficient MapReduce compilation, including cost-based optimization,
  • Support for HBase and HCatalog as data sources/targets,
  • Tools and examples that build Crunch pipelines in other JVM languages, such as Scala, JRuby, Clojure, and Jython.

Filed under:

9 Responses
  • Soren Macbeth / October 10, 2011 / 1:38 PM

    I’d love to see a compare and contrast between Crunch and Cascading.

  • Praveen / October 11, 2011 / 3:50 AM

    >Today, we’re pleased to introduce Crunch, a Java library that aims to make writing, testing, and running MapReduce pipelines easy, efficient, and even fun.

    What is a MapReduce pipeline?

  • Julian Cole / October 12, 2011 / 7:43 AM

    Nice explanation. It would probably help some people to add a brief description of what a map-reduce pipeline is, but aside from that, it’s very clear how this example does a “divide and conquer” operation – which makes it evident how this pattern can be applied to very large data sets. Thanks!

  • Brock / October 17, 2011 / 12:49 PM

    @Praveen/@Julian,

    A MapReduce pipeline is a series of MapReduce jobs. Say you want to find ten most common words in a corpus of documents.

    First you need to calculate word count. Second you need to sort the output of word count by the count, to filter out the top ten words.

    This is two map reduce jobs often called a `pipeline’.

  • Brock NOland / October 17, 2011 / 1:20 PM

    Also, there is a discussion of Crunch vs Pig vs Cascading here:

    https://github.com/cloudera/crunch/wiki/Frequently-Asked-Questions

  • huh / October 17, 2011 / 5:09 PM

    Would be cool if you could set up the pipeline creation with spring so you don’t have to write that code for every pipeline. oh wait, now its oozie. :)

  • Jeff Zhang / October 18, 2011 / 4:28 PM

    Integrate mapreduce with scala will be a very excellent solution. Scala is a functional programming language and is fit for mapreduce model. I always imagine that if users can use mapreduce easily like using collection classes in java or scala.

Leave a comment


nine + 3 =