Sending Files to Remote Task Nodes with Hadoop MapReduce

It is common for a MapReduce program to require one or more files to be read by each map or reduce task before execution. For example, you may have a lookup table that needs to be parsed before processing a set of records. To address this scenario, Hadoop’s MapReduce implementation includes a distributed file cache that will manage copying your file(s) out to the task execution nodes.

The DistributedCache was introduced in Hadoop 0.7.0; see HADOOP-288 for more detail on its origins. There is a great deal of existing documentation for the DistributedCache: see the Hadoop FAQ, the MapReduce Tutorial, the Hadoop Javadoc, and the Hadoop Streaming Tutorial. Once you’ve read the existing documentation and understand how to use the DistributedCache, come on back.

How is the DistributedCache accessed by a MapReduce job?

As of Hadoop 0.18.2, the action happens at the very beginning of the run method of the TaskRunner object. The process of copying the files to the local node is referred to as “localizing the archives”, and it’s performed before starting the local JVM for each task with a call to the static method getLocalCache of the DistributedCache class.

Where does the DistributedCache store data?

When debugging a MapReduce job, it can be useful to know where the DistributedCache is putting the cached files on the local nodes. By default, Hadoop stores these files under

/tmp/hadoop-<user.name>/mapred/local/taskTracker/archive

Every MapReduce task uses the directory specified by the configuration parameter mapred.local.dir to store scratch information; currently, this parameter defaults to <hadoop.tmp.dir>/mapred/local, and hadoop.tmp.dir defaults to /tmp/hadoop-<user.name>.

The static method getCacheSubdir method of the TaskTracker class uses the constants TaskTracker.SUBDIR and TaskTracker.CACHEDIR to construct the path under mapred.local.dir where you can find the locally cached files.

How big is the DistributedCache?

The local.cache.size parameter controls the size of the DistributedCache. By default, it’s set to 10 GB.

How can you add files to the DistributedCache with Hive?

Hive uses the syntax “ADD FILE path/to/file” to pass a file to the DistributedCache.

An alternative: pack the files into the job’s jarfile

When submitting a job via Hadoop Streaming, you can use the -file option to package a file with the jarfile containing your MapReduce program. Using the -file option is the preferred way to package your scripts for execution with Hadoop Streaming, as documented in the Hadoop Streaming Tutorial.

Note the difference between -file, which packs a file on your local filesystem with the jarfile containing your MapReduce program, and the -cacheFile option, which takes any URI accessible by the task nodes (including an HDFS URI). The former will be copied to the task nodes with the rest of the job, while the latter will be explicitly fetched by the DistributedCache.

An example

Okay, you’re now ready to try your hand at a nonstandard application of the above techniques. Instead of reading a file with configuration information, you are going to use the -file option to Hadoop Streaming to distribute an arbitrary Python module required by your job.

First, go grab the zip file under the “Installation to Non-Standard Location” from the NLTK site. Once you’ve pulled it down to your local box and unzipped it, hop into the download directory and run:
zip -r nltkandyaml.zip nltk yaml

mv ntlkandyaml.zip /path/to/where/your/mapper/will/be/nltkandyaml.mod

This will create a zipfile of the two modules you’ll need, with the .mod extension. Because the compressed file will be packaged inside of the job’s jarfile when the job is submitted to the JobTracker, it will be decompressed during the unpacking of the jarfile if you leave it with the .zip extension. If we had kept the .zip extension, the file would have been moved inside the lib/ folder of the task’s working directory when copied to the remote node; with the .mod extension, modules will remain inside the task’s working directory.

You can now import the nltk module for use in your Python script:

import zipimport

importer = zipimport.zipimporter('nltkandyaml.mod')
yaml = importer.load_module('yaml')
nltk = importer.load_module('nltk')

In an upcoming post, we’ll look at how we can use NLTK and Hadoop Streaming to do powerful natural language processing over corpora of nearly arbitrary size.

Note: In the 0.19.x release family, the options for Hadoop streaming have changed (see HADOOP-3722). There is also discussion about modifying the “-file” syntax for Hadoop streaming at HADOOP-2622.

Filed under:

2 Responses

Leave a comment


five × 7 =