Crunch for Dummies

This guide is intended to be an introduction to Crunch.

Introduction

Crunch is used for processing data. Crunch builds on top of Apache Hadoop to provide a simpler interface for Java programmers to process data. In Crunch you create pipelines, not unlike Unix pipelines, such as the command below:

grep "ERROR" log | sort | uniq -c

Crunch pipelines consist of a series of functions you apply to the input data. Let’s say you have raw Apache HTTPD server logs and that you want to know the total amount of data downloaded by ip address. The raw input consists of lines like so:

96.7.4.14 - - [24/Apr/2011:04:20:11 -0400] "GET /cat.jpg HTTP/1.1" 200 12433 "http://google.com" "Firefox/3.6.16"
88.1.4.5 - - [24/Apr/2011:06:21:41 -0400] "GET /cat.jpg HTTP/1.1" 200 12433 "http://google.com" "Firefox/3.6.16"
96.7.4.14 - - [24/Apr/2011:07:33:59 -0400] "GET /dog.jpg HTTP/1.1" 200 42431 "http://google.com" "Firefox/3.6.16"

The ip address is field one and the number of bytes transferred is field seven.

The steps you would follow:

  1. Parse each input record outputting (emitting) field one and seven, e.g. 96.7.4.14 and 12433 for the first entry.
  2. Group all the (ip, response size) records by ip address.
  3. Foreach ip, sum up the response sizes.

The flow of this would look as follows.

Parse the input data, outputting ip address and response size:

96.7.4.14, 12433
88.1.4.5, 12433
96.7.4.14, 42431

Group by ip:

88.1.4.5, [ 12433 ]
96.7.4.14, [ 42431, 12433 ]

Calculate the sum:

88.1.4.5, 12433
96.7.4.14, 54864

Apache Hadoop

If the dataset was large enough such that we could not process it easily on a single server, we might consider Apache Hadoop. However, using Hadoop would require learning a complex process called MapReduce or a higher level language such as Apache Hive or Apache Pig.

Crunch

Using Crunch, a Java programmer with limited knowledge of Hadoop and MapReduce can utilize the Hadoop cluster. The program is written in pure Java and does not require the use of MapReduce specific constructs such as writing a Mapper, Reducer, or using Writable objects to wrap Java primitives.

In Crunch a Pipeline object represents the list of functions (parsing, counting, etc) you would like to take on some input data. Crunch programs will generally proceed as follows:

  1. Create a Pipeline object
  2. Read input (e.g. text file)
  3. Execute various functions on input data
  4. Write result to some destination (e.g. text file)

Example

Let’s look at the code to execute the work we described above, calculating the total number of bytes transfered for a particular IP address.

Creating the Pipeline object:

 Pipeline pipeline = new MRPipeline(TotalBytesByIP.class, getConf());

In the MRPipeline constructor we pass the class containing our code. We also pass the configuration object which was given to our class at runtime.

Reading input:

 PCollection<String> lines = pipeline.readTextFile(args[0]);

There are a couple of things to note above. First, in Crunch all actions are delayed until we call pipeline.run() or pipeline.done(). Second, because all actions are delayed, we will not store the input data in a java.util.Collection but in a Crunch PCollection. There is support for converting a Crunch PCollection to a java.util.Collection.

Processing of the PCollection will be done with functions which are a subclass of DoFn. The DoFn class requires you implement one method, process() which is the method which will be called for each line of the text file. Note the text file is represented by the PCollection at this point. The function below, based on a regular expression, parses each Apache HTTP log line, outputting (emitting) the ip address and the response size.

// Input: 55.1.3.2  ...... 200 512 ....
// Output: (55.1.3.2, 512)
DoFn<String, Pair<String, Long>> extractIPResponseSize = new DoFn<String, Pair<String, Long>>() {
  transient Pattern pattern;
  public void initialize() {
    pattern = Pattern.compile(logRegex);
  }
  public void process(String line, Emitter<Pair<String, Long>> emitter) {
    Matcher matcher = pattern.matcher(line);
    if(matcher.matches()) {
      try {
        Long responseSize = Long.parseLong(matcher.group(7));
        String remoteAddr = matcher.group(1);
        emitter.emit(Pair.of(remoteAddr, responseSize));
      } catch (NumberFormatException e) {
        // corrupt line, we should increment a counter
      }
    }
  }
};

Next we need to apply the extractIPResponseSize function to each of the lines in the input, represented by the PCollection lines:

  lines.parallelDo(extractIPResponseSize

Note however, that the above we have not assigned the results of the lines.parallelDo to another variable. In order to do this, we are required to inform the parallelDo method the type of data which will be returned. In our case, we want to group by ip address (key) and as such we will return a table (basically a map) of Strings (ip addresses) and longs (response sizes):

PTable<String, Long> ipAddrResponseSize =
  lines.parallelDo(extractIPResponseSize,
    Writables.tableOf(Writables.strings(),Writables.longs()))

The above code is nearly complete, but we still have not grouped the data by ip address (key) nor summed up the responses sizes. We need to create what is known as a combiner to do the sum:

// Combiner used for summing up response size
CombineFn<String, Long> longSumCombiner = CombineFn.SUM_LONGS();

Then execute the group by key (ip address) and sum operation:

// Table of (ip, sum(response size))
PTable<String, Long> ipAddrResponseSize =
  lines.parallelDo(extractIPResponseSize,
    Writables.tableOf(Writables.strings(),Writables.longs()))
        .groupByKey()
        .combineValues(longSumCombiner);

You will be tempted to try and iterate over ipAddrResponseSize in order to see the results. However, remember that everything in Crunch is delayed until pipeline.run() or pipeline.done() is called. Additionally, in this tutorial, we won’t cover processing the results of a pipeline in memory. We simply write the results to a file.

Below we write the results to a text file and then execute the pipeline:

pipeline.writeTextFile(ipAddrResponseSize, args[1]);
// Execute the pipeline as a MapReduce.
pipeline.done();

If you have made it this far, congratulations! To start using Crunch, either:

Filed under:

3 Responses

Leave a comment


6 − four =