How-to: Tune MapReduce Parallelism in Apache Pig Jobs

Categories: Guest How-to Pig

Thanks to Wuheng Luo, a Hadoop and big data architect at Sears Holdings, for the guest post below about Pig job-level performance tuning

Many factors can affect Apache Pig job performance in Apache Hadoop, including hardware, network I/O, cluster settings, code logic, and algorithm. Although the sysadmin team is responsible for monitoring many of these factors, there are other issues that MapReduce job owners or data application developers can help diagnose, tune, and improve. One such example is a disproportionate Map-to-Reduce ratio—that is, using too many reducers or mappers in a Pig job.

There are two simple reasons why using too many mappers or reducers should be avoided. First, it can inhibit your MapReduce job’s performance. It’s a myth that the more mappers and reducers we use, the faster our jobs will run—the fact is, each MapReduce task carries certain overhead, and the communication and data movement between mappers and reducers take resources and time. Thus, tuning your job so that workload is evenly distributed across reducers, with as little skew as possible, is much more effective than blindly increasing the number of mappers or reducers. Furthermore, the number of MapReduce tasks that can run simultaneously on each machine is limited. Given these facts, using more mappers or reducers than you actually need will slow your job down rather than speed it up.

Second, using more mappers or reducers than needed can have a significant negative impact on other MapReduce applications, and can constrain your Hadoop cluster’s overall performance. We all know that a Hadoop cluster is a distributed, resource sharing grid, but we often forget or ignore the fact that we are not running our own MapReduce jobs in a vacuum.

For example, a typical cluster’s total MapReduce task capacity, generally speaking, is configured to a ratio about 1:0.7. A Hadoop cluster with 100 data nodes, for instance, could have 1,000 map slots and 700 reduce slots allocated in total, about 10 slots for maps and 6-7 slots for reduces for each data node on average. With Hadoop 2, YARN’s default capacity scheduler allows us to partition applications with queues based on the nature, type, and priority of MapReduce jobs and cap how much resource a MapReduce task can take.

There are scenarios, however, in which the request of a job asking for more resources than it actually needs can be granted if other queues’ resources are under-utilized and free resources are available at that moment. In that case, a queue could be allocated with unnecessary resources beyond its defined capacity, since the ResourceManager is not designed to distinguish reasonable resource requests from unreasonable or excessive ones. Therefore, it is our responsibility as developers to not waste or abuse shared resources in way that interferes with the normal operation of other MapReduce jobs in the same cluster.

Diagnosis

Well, you might ask, how do I know whether I used too many reducers or mappers in my Pig jobs? Listed below are some simple steps to help you diagnose your jobs.

  1. Go to the job history portal, or in Hadoop 1, the Jobtracker page, find your completed job, and look at the numbers of mappers and reducers used. If the number of your reducers is equal to or even greater than that of mappers, that is the first sign that your job might have a map-to-reduce ratio issue.
  2. Check the number and size of your Pig job’s input files. If the number of mappers used in your Pig job is equal to the number of input files (that is, one mapper for each input file), and the size of your input files is less than the default Hadoop data split size (64 or 128 MB), then it is likely that you used too many mappers.
  3. On the job history or Jobtracker portal, check the actual execution time of your MapReduce tasks. If the majority of your completed mappers or reducers were very short-lived, say, under 10-15 seconds, and if your reducers ran even faster than your mappers, that’s a good sign you might have used too many reducers and/or mappers.
  4. Go to your job’s output directory in HDFS and check the number of output part-* files. If that number is greater than your planned reducer number (each reducer is supposed to generate one output file), it indicates that your planned reducer number somehow got overridden and you used more reducers than you meant to.
  5. Check the size of your output part-* files. If the planned reducer number is 50, but some of the output files are empty, that is a good indicator that you over-allocated reducers for your job and wasted the cluster’s resources.

Solutions

If you find your job has one or more of the symptoms mentioned above, how do you fix it? Here are a few ideas that can help you tune and improve the MapReduce performance at job level while you design, implement, and test your applications.

  1. Generally, the Mappers-to-Reducers ratio is about 4:1. A Pig job with 400 mappers usually should not have reducers more than 100. If your job has combiners implemented (using group … by and foreach together without other operators in between), that ratio can be as high as 10:1—that is, for 100 mappers using 10 reducers is sufficient in that case.
  2. For Symptom 2 mentioned previously, when there are a lot of small input files, smaller than the default data split size, then you need to enable the combination of data splits and set the minimum size of the combined data splits to a multiple of the default data split size, usually 256 or 512MB or even more. Specifically, set these Pig parameters as shown:
  3. Do not solely rely on a generic default reduce parallelism setting in the line of SET default_parallel … at the very beginning of your Pig code. Generally, hard-coding a fixed number of reducers in Pig using default_parallel or parallel is a bad idea. Instead, calculate specifically the appropriate number of mappers based on the split size of your input data (the split size should be a multiple of the system default, in the range of 128-512MB normally, could be even higher if input data is at multi-TB scale), and then apply the ratio mentioned in Solution 1 above to come up with the proper number of reducers to use. For example, if each of your mappers takes input data split of 512MB or so, each reducer then can take about 2GB of data (512MB x 4) based on the recommended ratio. Another way to estimate the right number of reducers is to look at the reduce task run time—which is usually between 5-15 minutes, on average.
  4. If you don’t have the line set default_parallel in your script, and don’t use the keyword parallel specifically anywhere in your code, then at the script/application level, Pig has two properties that users can manipulate toward appropriate reduce parallelism: reducers.bytes.per.reducer, which has a default value of 1GB but as mentioned in Solution 2 can be set to 2GB or even higher, depending on the total input size and how optimized your script is; and reducers.max, which has a default setting of 999 but should be set with a much lower number to cap the maximum number of reducers a Pig job should take. Between the two properties—(total input size / bytes.per.reducer) and reducers.max—Pig will give precedence to the one that is smaller. If your total input data is about 100GB, possible settings could include:
  5. If you know what you are doing and really need more control over each and every one of the reduce phases in your Pig script, then instead of setting bytes per reducer as outlined in Solution 3, you may specify the reducer number with the keyword parallel in each reduce block, such as GROUP, JOIN, ORDER BY, DISTINCT, and CROSS, so that your job-specific reduce settings will take over and override possible inappropriate default settings.

Conclusion

Tuning Pig performance at job level by using an appropriate numbers of mappers and reducers involves many things, and is contingent on resources, job workload, other jobs in the same environment, and other factors. Therefore, it may take some iteration to achieve tangible improvement. Just use your best judgment, and you’ll get there!

Wuheng Luo is currently a Hadoop and big data architect at Sears Holdings. His technical experience includes working on Hadoop data pipelines at Yahoo! He has presented at IEEE Big Data conferences and Hadoop Summit. 

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

8 responses on “How-to: Tune MapReduce Parallelism in Apache Pig Jobs

  1. Zach Beniash

    Hi Wuheng Luo,
    Very interesting and useful article.
    In solution number 4 you have mentioned that the number of reducers calculation being done by (total input size / bytes.per.reducer).
    I wonder if there is a way to consume the value of ‘total input size’ for other purposes within my Pig script.
    For example, I would like to group my input data into small bags with approximately size of 1000 records each.
    In order to achieve that I could do (assuming RAND returns evenly distributed results):

    %declare RECORD_AVG_SIZE_BYTES 50;
    %declare NUM_OF_RECORDS = $total_input_size/$RECORD_AVG_SIZE_BYTES;
    %declare NUM_OF_GROUPS = $NUM_OF_RECORDS/1000;
    .
    .
    groupedToThousand = GROUP input BY RANDOM($NUM_OF_GROUPS );
    .
    .

    So, is there a way to find out dynamically what is the value of total_input_size?

      1. Zach Beniash

        Hi Nitish,
        In a matter of fact this approach works and commonly used in my project.
        We does not use the RANDOM function as I showed in my example above, because this will not allow us grouping the tuples based on their values.
        What we did is creating a custom UDF to perform a Hash Modulo operation on a given tuple hash. The modulo value is being provided to the UDF through it’s constructor, and so the ecec method returns that: (input.hashCode() & Integer.MAX_VALUE) % moduloInt;

  2. Wuheng Luo

    That’s a good question, Zach. The value for total input size in bytes is not handily available in Pig script, but can be obtained by issuing this hdfs command “hdfs dfs -du -s ” if your input data is stored under hdfs, or this shell command “du -s ” if your input data resides locally in linux file system.

    Theoretically we could use the “%declare” statement in Pig to assign that value by using back quotes, the same way as we sometimes do “%declare DATE date +%Y%m%d;” in Pig; the problem is, Pig does not like characters such as “-” or “/” within back quotes, so using the syntax like “%declare INPUT_SIZE fs -du -s /my/hdfs/input/dir” won’t work. If you or anyone else can make “%declare” or “DEFINE” statement work in the fashion as I mentioned, please let us know. Otherwise, we might want to open a Pig JIRA ticket for that.

    Before that is taken care of and fixed, the workaround I would suggest is to create a shell script to invoke your Pig job, that is, run hdfs command in the shell script and pass the value as a parameter to your Pig program. A couple of things here: If you have multiple input sources and use more than one “LOAD” statement in your Pig code, you need to calculate the sum of sizes from all input datasets in shell script before assigning the total size. Also, when the number of input size is larger than the integer type can take, Pig may throw some error like: ERROR 1200: For input string: “74312692689”, since it assumes a default int type in implicit casting. The way I hack this issue is in the shell script, divide the input size by a number, say 1000 or 10000 before assigning the value as a Pig param, then in Pig script, times that number and explicitly cast it into a long type, e.g., (long)$TOTAL_INPUT_SIZE * 1000.

    I attach a sample run script in shell and a sample Pig script below for your reference. Doing it this way, the value of total input size in bytes will be available in your Pig script. Hope it helps. Wuheng

    Sample shell script:
    #!/usr/bin/env bash

    # test.sh
    #
    size1=(hdfs dfs -du -s /user/my_name/test/input_dir_1)
    size2=(hdfs dfs -du -s /user/my_name/test/input_dir_2)

    pig -Dmapred.map.tasks.speculative.execution=”true” \
    -Dmapred.min.split.size=$[1024 * 1024 * 512] \
    -Dmapred.max.split.size=$[1024 * 1024 * 512 * 2] \
    -param TOTAL_INPUT_SIZE=$[(size1 + size2) / 1000] \
    -file /home/auto/my_name/test/test.pig

    # end test.sh

    Sample Pig script:
    — test.pig

    A = load ‘/user/my_name/test/input_dir_1’ using PigStorage(‘\u0001’);
    B = load ‘/user/my_name/test/input_dir_2’ using PigStorage(‘\u0001’);
    A2 = limit A 2;
    B2 = limit B 2;
    A3 = foreach A2 generate $0;
    B3 = foreach B2 generate $0;
    C = union A3, B3;
    D = foreach C generate $0, (long)$TOTAL_INPUT_SIZE * 1000;
    dump D;

    — end test.pig

  3. Wuheng Luo

    Somehow the back quotes got lost in copy and past. Make sure to put the hdfs command in back quotes in shell script:

    size1=(`hdfs dfs -du -s /user/my_name/test/input_dir_1`)
    size2=(`hdfs dfs -du -s /user/my_name/test/input_dir_2`)

  4. Zach Beniash

    Thanks for the scripts references, and this should perfectly work with the OOTB PigStorage load.

    However, There are cases where:
    * The Pig script is being executed by Oozie (and not by shell script).
    * The loading is being done by custom LoadFunc (and not by PigStorage).

    In such cases, there is no specific directory with input file to measure it’s size, but it might be from multiple files (for the same input), or from some DB (HBase/Oracle etc.).

    In order to overcome this obstacle, the developer can make his/her custom LoadFunc to implement LoadMetadata interface, and to implement the method “getStatistics()”.
    getStatistics is where the developer will code the logic for measuring the total input size, if it is by summing up the cumulative size of the set of files in that input, or by querying the relevant DB etc.

    Obviously, the getStatistics() method is being called in the frontend (Driver) of the Map-Reduce job, so it might be handy to pass the value of total input size from there, to all of the mappers tasks, so it will be available as a job configuration. It might be extracted from the configuration job using a custom UDF for that purpose only.

    Is this a valid approach? If not, is there a better way that the developer can pass this information to the job properties, without having to wrap each pig script with a shell script ?

  5. Zach Beniash

    As a reference for the UDF solution approach, you can see in the book “Programming Pig” by Alan Gates (
    Published by O’Reilly Media, Inc.):

    * Section 10 — “Writing an Evaluation Function in Java” — “UDFContext”
    * Section 11– “Load Functions” — “Passing Information from the Frontend to the Backend”

  6. abiya

    Excellent article. We hear so much about Map Reduce these days, especially in the NoSQL world, so to see well written set of use cases is very helpful indeed. Thanks.

Leave a Reply

Your email address will not be published. Required fields are marked *