# A profile of Apache Hadoop MapReduce computing efficiency

- by Jon Zuanich (@jonzuanich)
- December 14, 2010
- 2 comments

**Guest post from Paul Burkhardt, a Research Developer at SRA International, Inc. where he develops large-scale, distributed computing solutions.**

## Part I

We were asked by one of our customers to investigate Hadoop MapReduce for solving distributed computing problems. We were particularly interested in how effectively MapReduce applications utilize computing resources. Computing efficiency is important not only for speed-up and scale-out performance but also power consumption. Consider a hypothetical High-Performance Computing (HPC) system of 10,000 nodes running 50% idle at 50 watts per idle node, and assuming 10 cents per kilowatt hour. It would cost $219,000 per year to power just the idle-time. Keeping a large HPC system busy is difficult and requires huge datasets and efficient parallel algorithms. We wanted to analyze Hadoop applications to determine the computing efficiency and gain insight to tuning and optimization of these applications. We installed CDH3 onto a number of different clusters as part of our comparative study. The CDH3 was preferred over the standard Hadoop installation for the recent patches and the support offered by Cloudera. In the first part of this two-part article, we’ll more formally define computing efficiency as it relates to evaluating Hadoop MapReduce applications and describe the performance metrics we gathered for our assessment. The second part will describe our results and conclude with suggestions for improvements and hopefully will instigate further study in Hadoop MapReduce performance analysis.

For distributed computing to be effective there must be sufficient input and parallelism to saturate the compute resources. Saturation in the context of Hadoop MapReduce refers to maintaining the maximum task rate for the duration of the application run. The maximum task rate is defined as the total number of task slots divided by the time to complete a single task. The maximum task rate is therefore the upper-bound on throughput. Our goal is to reach steady-state at peak throughput. A system in steady-state can be modeled using Little’s Law from Queuing Theory. The equation for Little’s Law defines the average throughput for any steady-state system and can be written as,

,

This law is often used to calculate the concurrency in HPC systems. Our queue length, N, is the total number of task slots and the queue time, T, is the time it takes for a task to be assigned and completed. We can determine the average throughput, ?, by dividing the total task slots with the average task duration time. The ratio of the average throughput to the maximum task rate indicates the computing efficiency and how well we have saturated the resources. For example, a system of sixteen hosts with sixteen task slots per host, and given a workload of identical tasks that complete every minute, the maximum task rate for this system is 16×16=256 tasks per minute. At steady-state this rate is the throughput and so the system is 100% saturated. If the system is at steady-state but the throughput is less than the maximum task rate, it is under-utilized. Setting the number of concurrent tasks for map and reduce phases requires modifying the following parameters in the mapred-site.xml file on each host,

mapred.tasktracker.map.tasks.maximum

mapred.tasktracker.reduce.tasks.maximum

We set the values for these parameters to the actual number of execution units on a host. For example, if a host has two quad-core CPUs capable of two simultaneous threads it will have a total of 2x4x2=16 PEs. The maximum task parameters should not be confused with the parameters that set the total number of map and reduce tasks for an entire job,

mapred.map.tasks

mapred.reduce.tasks

Having established the condition for system saturation we devised two scalability studies to identify how well Hadoop MapReduce applications scale in fixed- and scaled-sized problems. We know by Amdahl’s Law, defined in the following equation, that only an algorithm that can be decomposed into mostly independent tasks, an embarrassingly parallel algorithm, can scale linearly for fixed-sized problems.

The speedup, S, for a parallel algorithm depends on its parallel and serial proportions, denoted in the equation by f_{p} and f_{s}, and the number of compute nodes, n. The speedup is bounded by the serial fraction and unless algorithms can be expressed with a high degree of parallelism, the performance is rapidly limited regardless of the available compute resources. Despite this stringent requirement we can efficiently utilized algorithms that have maximized their parallelism by scaling the problem size (c.f. Gustafson’s Law).

For fixed-sized problems, we scale the compute resources with constant input size to determine node scalability, the throughput versus the number of nodes. Here, nodes refer to hosts in the cluster which are comprised of potentially many processing elements (PE). Data scalability refers to our study of scaled-sized problems by plotting the throughput against increasing input size in a system with fixed compute resources.

We use the Ganglia cluster monitoring tool^{[1]} to measure hardware statistics and the Hadoop job history log for analyzing job performance. The job history file is stored in the job output directory in HDFS and in the Hadoop log directory. The following Hadoop job command will summarize the job history.

hadoop job –history <hdfs job output directory>

We developed a simple Perl script to parse the raw job history log into three datasets for plotting the execution profile, concurrency, scalability, and load balance. The time series data for the tasks per job phase comprises one dataset, and another dataset compiles the duration time and workload for each task per host. The final dataset is a summary of aggregated job metrics including the wall clock time and processed bytes; a complete listing is available in **Table 1**.

METRIC |
UNITS |

Job start time since Unix Epoch | milliseconds |

Job stop time since Unix Epoch | milliseconds |

Input size | bytes |

Total read | bytes |

Total written | bytes |

Total shuffled | bytes |

Total spilled | bytes |

Throughput | bytes/second |

Total input splits | |

Total map tasks | |

Total reduce tasks | |

Average map duration | seconds |

Average reduce duration | seconds |

Average shuffle duration | seconds |

Average sort duration | seconds |

Total map nodes | |

Total reduce nodes | |

Rack-local map tasks | |

Data-local map tasks |

**Table 1 Job Metrics.**

From these three datasets we are able to plot the following five functions:

- job phases
- task rates
- job histograms
- node scalability
- data scalability

The job phase plots indicate the magnitude and duration of the map, reduce, shuffle, and sort phases. The phases are plotted as the instantaneous task count versus time and were inspired by the plots published by the Yahoo Terasort benchmark^{[2]}. The plot data comes from the time series dataset generated by our job parser. A phase that saturates the available compute resources will be depicted by a plateau, the steady-state. The plot is a great indicator of both concurrency and the performance profile.

The task rates are plotted as the cumulative task count versus task duration. A saturated system should display a linear slope where the rate of incoming tasks is constant and equal to the rate of completed tasks. Straggling tasks will appear as tracks. If constrained by resource availability the slope will decline. The task rates are plotted from our second dataset of task duration. The plots illustrate the throughput and concurrency, as well as identify individual task bottlenecks.

The job histograms elucidate potential load-balancing issues by depicting the variance between the compute hosts for task assignment, task duration, and disk IO. A common issue in distributed systems is “hot-spotting” where one node is tasked much more than others or there is a location in memory or disk that is in high demand. This situation typically leads to bottlenecks. The histograms are also plotted from our second dataset which includes the relevant workload and duration time per task.

The final two functions compare the node and data scalability across different clusters as a function of throughput. The throughput is the total input size in bytes divided by the wallclock time and a node refers to a host in the cluster.

We will illustrate the performance profile of two candidate Hadoop MapReduce applications in Part II of this exposition and conclude with a discussion of our analysis.

[1] http://ganglia.sourceforge.net

[2] http://sortbenchmark.org/Yahoo2009.pdf

## Filed under:

2 Responses