How-To: Run a MapReduce Job in CDH4

This is the first post in series that will get you going on how to write, compile, and run a simple MapReduce job on Apache Hadoop. The full code, along with tests, is available at http://github.com/cloudera/mapreduce-tutorial. The program will run on either MR1 or MR2.

We’ll assume that you have a running Hadoop installation, either locally or on a cluster, and your environment is set up correctly so that typing “hadoop” into your command line gives you some notes on usage. Detailed instructions for installing CDH, Cloudera’s open-source, enterprise-ready distro of Hadoop and related projects, are available here: https://ccp.cloudera.com/display/CDH4DOC/CDH4+Installation. We’ll also assume you have Maven installed on your system, as this will make compiling your code easier. Note that Maven is not a strict dependency; we could also compile using Java on the command line or with an IDE like Eclipse.

The Use Case

There’s been a lot of brawling on our pirate ship recently. Not so rarely, one of the mates will punch another one in the mouth, knocking a tooth out onto the deck. Our poor sailors will wake up the next day with an empty bottle of rum, wondering who’s responsible for the gap between their teeth. All this violence has gotten out of hand, so as a deterrent, we’d like to provide everyone with a list of everyone that’s ever left them with a gap. Luckily, we’ve been able to set up a Flume source so that every time someone punches someone else, it gets written out as a line in a big log file in Hadoop. To turn this data into these lists, we need a MapReduce job that can 1) invert the mapping from attacker to their victim, 2) group by victims, and 3) eliminate duplicates.

The Input Data

A file or set of text files, in which each line represents a specific instance of punching, containing a pirate, a tab character as a delimiter, and then a victim. An example input file is located in the github repo at https://github.com/cloudera/mapreduce-tutorial/blob/master/samples/gaplog.txt.

Writing Our GapDeduce Program

Our map function simply inverts the mapping of punchers to their targets.

public class Gapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
      public void map(Text attacker, Text victim, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        output.collect(victim, attacker);
      }
}

By using the victim as our map output/reduce input key, the shuffle groups by victim.

Our reduce function is a little more complicated but not by much. It uses a TreeSet to eliminate duplicates and order by name, and then outputs the list of pirates as a single string.

public class Deducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
  public void reduce(Text key, Iterator<Text> values,
      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
    Set<String> attackers = new TreeSet<String>();
    while (values.hasNext()) {
      String valStr = values.next().toString();
      attackers.add(valStr);
    }
    output.collect(key, new Text(attackers.toString()));
  }
}

Finally, we write a driver that will run the job.

public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf();
      conf.setJobName("gapdeduce");
     
      // This line specifies the jar Hadoop should use to run the mapper and
      // reducer by telling it a class that’s inside it
      conf.setJarByClass(GapDeduceRunner.class);

      conf.setMapOutputKeyClass(Text.class);
      conf.setMapOutputValueClass(Text.class);

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(Text.class);

      conf.setMapperClass(Gapper.class);
      conf.setReducerClass(Deducer.class);

      // KeyValueTextInputFormat treats each line as an input record,
      // and splits the line by the tab character to separate it into key and value
      conf.setInputFormat(KeyValueTextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      JobClient.runJob(conf);
    }

Compiling and Packaging

To run our program, we need to compile it and package it into a jar that can be sent to the machines in our cluster. To do this with Maven, we set up a project whose POM contains the hadoop-client dependency.

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.0.0-cdh4.1.0</version>
</dependency>

Furthermore, we include the following repositories so Maven knows where to get the bits.

<repository>
  <id>maven-hadoop</id>
  <name>Hadoop Releases</name>
  <url>https://repository.cloudera.com/content/repositories/releases/</url>
</repository>
  <repository>
  <id>cloudera-repos</id>
  <name>Cloudera Repos</name>
  <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

The full pom.xml is included in the github repository at https://github.com/cloudera/mapreduce-tutorial/blob/master/pom.xml.

Then, in the project’s root directory, we run  mvn install. This command both compiles our code and generates a jar, which should show up in the target/ directory under the root project directory.

Running Our MapReduce Program

The github repo contains a sample input file and expected output file. We can put the sample input file into a directory on hdfs called “gaps” using

hadoop fs -mkdir /gaps
hadoop fs -put samples/gaplog.txt /gaps

We submit our MapReduce job using “hadoop jar”, which is similar to “java jar”, but includes all the necessary environment for running our jar on Hadoop.

hadoop jar target/gapdeduce-1.0-SNAPSHOT.jar GapDeduceRunner /gaps/gaplog.txt /gaps/output

To inspect the output file of our job, we can run

hadoop fs -cat /gaps/output/part-00000

At last, we can provide our mates with a full accounting of their gaps. 

In the next post in this series, we’ll cover some advanced MapReduce features like the distributed cache, as well as testing MapReduce code with MRUnit.

Sandy Ryza is a Software Engineer at Cloudera, working on the Platform team.

Filed under:

6 Responses
  • Assirk / December 20, 2012 / 7:50 PM

    Nice project! Was rendaig the group by example: Imagine we have a register of daily unique visits for each URL: [“url”, “date”, “visits”]. We want to calculate the unique visits up to each date from that register and noticed that actually the cumulative metric computed is not unique visits as there might be duplicated visits across days.Anyway, nice abstraction on top of Hadoop!

  • Jagan / March 06, 2013 / 5:01 AM

    Nice

  • David Parks / April 24, 2013 / 12:16 AM

    How do you use MultipleInputs in this example? MultipleInputs requires a “Job” object, not a JobConf object, and it’s not clear to me when to use one vs. the other.

  • Sandy Ryza / April 24, 2013 / 11:53 AM

    Hi David,

    MapReduce has two APIs, an old one that uses JobConf, and a new improved one that uses Job.

    My follow up post http://blog.cloudera.com/blog/2013/02/how-to-run-a-mapreduce-job-in-cdh4-using-advanced-features/ shows how to write a job with the new API.

  • Rishq / July 26, 2013 / 4:00 PM

    Can I run a map reduce job without rebuilding with new dependencies in pom.xml on cdh4? (which I built and ran on apache hadoop before)

Leave a comment


+ 2 = eleven