Apache Hadoop YARN: Avoiding 6 Time-Consuming "Gotchas"

Understanding some key differences between MR1 and MR2/YARN will make your migration much easier.

Here at Cloudera, we recently finished a push to get Cloudera Enterprise 5 (containing CDH 5.0.0 + Cloudera Manager 5.0.0) out the door along with more than 100 partner certifications.

CDH 5.0.0 is the first release of our software distribution where YARN and MapReduce 2 (MR2) is the default MapReduce execution framework, and the migration from MR1 to MR2 was surprisingly easy across our partner ecosystem. Partners certifying on MR2 during our beta program did not report problems running jobs compiled for MR1 on an MR2 cluster. Also, because switching a cluster from MR1 to MR2 is as easy as enabling the YARN service on Cloudera Manager while disabling the MR1 service, the administrative costs of swapping frameworks are trivial. Thus, 99 percent of our certifying partners made the switch to MR2 without major effort; the remaining one percent simply didn’t bother because Cloudera will continue to support MR1 for the duration of the Cloudera Enterprise 5 lifecycle.

The Cloudera Engineering team has done a great job explaining YARN basic concepts (here and here) as well as documenting how to move from MR1 to YARN. However, partner certification work for Cloudera Enterprise 5 surfaced a set of six common issues. These issues highlight some key behaviors in MR2/YARN that are often overlooked or misunderstood. They are:

  1. YARN concurrency
  2. Performance differences under YARN
  3. Resource allocation versus resource consumption
  4. How logs are displayed and processed
  5. Live lock with concurrent job submission
  6. Killing of tasks due to virtual memory usage

Knowing these issues in advance will help you understand when your cluster doesn’t behave as expected. In this post, you’ll get an introduction to them.

1. YARN Concurrency (aka “What Happened to Slots?”)

After making the shift to MR2, partners observed a different number of concurrent tasks (usually more) than they expected, which had an impact on observed performance and utilization.

Prior to MR2, the number of concurrent mappers and reducers per node is specified by the administrator. In addition to specifying those numbers, the administrator sets the default memory allocations per mapper or reducer via the mapred.child.java.opts and mapred.child.ulimit settings. Cloudera’s recommendation is to allocate mappers and reducers using a 3/2 ratio, with the total tasks being less than the total number of cores on the node. This recommendation aims at optimal cluster performance under heavy load, but the rigid slot count model in MR1 sometimes leaves clusters underutilized.

In contrast, in MR2, the number of concurrent mappers and reducers (the “slot count”) is calculated by YARN based on allocations by the administrator. Each node dedicates some amount of memory (via the yarn.nodemanager.resource.memory-mb setting) and CPU (via the yarn.nodemanager.resource.cpu-vcores setting) to YARN for any YARN application. Because a MapReduce job is a YARN application, it asks YARN for containers to run its map and reduce tasks, via the settings mapreduce.[map|reduce].memory.mb and mapreduce.[map|reduce].cpu.vcores. The memory and CPU resources managed by YARN are pooled and shared between maps and reduces (and other container requests from other frameworks). A node is eligible to run an MR2 task when its available memory and CPU can satisfy the resource ask of the task.

This approach is an improvement over that of MR1, because the administrator no longer has to bundle CPU and memory into a Hadoop-specific concept of a “slot” — but for some people accustomed to MR1, it’s confusing. To help clear things up, consider the case of an idle cluster that has one large MapReduce job submitted to it that consumes all the available resources. How many tasks run at a time?

In MR1, the number of tasks launched per node was specified via the settings mapred.map.tasks.maximum and mapred.reduce.tasks.maximum. In MR2, one can determine how many concurrent tasks are launched per node by dividing the resources allocated to YARN by the resources allocated to each MapReduce task, and taking the minimum of the two types of resources (memory and CPU). Specifically, you take the minimum of yarn.nodemanager.resource.memory-mb divided by mapreduce.[map|reduce].memory.mb and yarn.nodemanager.resource.cpu-vcores divided by mapreduce.[map|reduce].cpu.vcores. This will give you the number of tasks that will be spawned per node.

This approach is illustrated in the spreadsheet shown below:

In the above example, a YARN cluster has 42GB of memory and 16 cores allocated to it per node. You can allocate capacity based on memory and CPU. When 2GB and one core are allocated to each map task, the CPU-based allocation takes precedence – thus 16 map tasks are spawned because that is the smaller value.

Of course, YARN is more dynamic than that, and each job can have unique resource requirements — so in a multitenant cluster with different types of jobs running, the calculation isn’t as straightforward. Also, the map and reduce task counts come out of the same pool, so the setting of mapred.slowstart.completed.maps also affects cluster behavior as it allows resources to be allocated to the reduce phase early.

Regardless, I find the simple logic represented in a spreadsheet to be a good starting point for helping partners understand resource allocation to tasks in a YARN cluster. If you’d like, you can save this spreadsheet locally as a Microsoft Excel file and tinker with the bolded values yourself in order to gain a better understanding of how MapReduce jobs running on your cluster will behave under YARN.

2. Performance Differences under YARN

After upgrading to Cloudera Enterprise 5 and enabling the YARN service, administrators are often asked to demonstrate the improved performance using a widely accepted MapReduce job such as TeraSort or TestDFSIO. However, it’s important to ensure a proper performance comparison. TeraSort has also changed with this release, using values that are less compressible and therefore cause more network transfer. The reported numbers from TeraSort might be disappointing unless you’re aware of that change.

Likewise, due to the slot calculation logic detailed above, your TestDFSIO run might not see the dramatic improvement you expect. TestDFSIO runs best when a task gets a dedicated CPU and a dedicated disk. When slot counts are specified, it’s easy to engineer this situation, but it’s trickier with YARN. If YARN calculates that it can run more concurrent TestDFSIO mappers than you have available disk, you’ll see contention between mappers and performance will suffer.

It’s therefore important to understand how benchmarks like TestDFSIO and Terasort differ from real-world workloads. Benchmarks modeled after such workloads, such as SWIM, will better expose the benefits of YARN’s resource allocation.

2a: Cloudera Manager Defaults (an aside)

One partner reported degraded performance after a default installation of Cloudera Manager in an environment that had not previously used it. This partner was running a set of benchmarks on “out-of-the-box” configurations from various vendors, and Cloudera appeared to come up short. As it turned out, the various additional services installed by Cloudera Manager were causing the problem.

At install time, you can tell Cloudera Manager to install lots of services on the cluster, including resource-intensive services like Apache HBase, Cloudera Impala, and Cloudera Search. While this feature is fantastic for getting numerous Hadoop-related services up and running quickly, it also divides your cluster resources across the selected services, negatively impacting the results of any test that uses any single service with the expectation of full access to the cluster. If you’re running a benchmark that affects one service exclusively, obtain a fair value by removing unused services.

3. Resource Allocation versus Resource Consumption

YARN’s Resource Manager does resource allocation, but it does not specify the heap settings of any spawned processes. For example, the memory provided to a mapper process by the NodeManager is specified to YARN via the per-job mapreduce.map.memory.mb. However, the JVM itself has memory overhead. So the heap size available to the mapper is determined by settings such as mapred.[map|reduce].child.java.opts, which specifies the heap size for the launched Java process. When resources allocated by YARN don’t match the resources consumed by the MR2 job, you’ll see either cluster underutilization or jobs being killed, depending on the direction of the mismatch.

Until we can automate this check in Cloudera Manager, you might not be aware that you can change a setting that’s defaulting to a value that’s too small.

4. Changes in Logs and Debugging Techniques

In MR1, it’s fairly easy to debug a failing MapReduce job (or task) by using Cloudera Manager’s distributed log search feature (or a distributed grep) to search for the numeric portion of the job ID or task ID in the Hadoop logs. This will return HDFS operations that use that task ID as a client from the NameNode logs, scheduling operations from the JobTracker, and messages from the processes spawned by the TaskTrackers. Here is a screenshot of searching the MapReduce logs for a MapReduce job ID in a CDH 4.6 cluster running MR1. Job_201404101622_0001 is a randomwriter job:

That set of entries from numerous log files shows MapReduce-specific information, including job initialization, task assignment, and task launch of mapper 41. The Hadoop process logs show MapReduce-specific information that you can use to inform a troubleshooting exercise.

With YARN, MR2 is one of many processing frameworks that you can deploy. As such, the log files from YARN are more generic.

Here is the output of the same search by job ID for the same randomwriter job on a Cloudera Enterprise 5 cluster running YARN/MR2:

Not only has the terminology changed in the processes generating the log messages (you now see ResourceManager logs instead of JobTracker logs, for instance), but you can’t tell from the YARN messages that the “application” is a MapReduce job and the individual containers are, in fact, running mappers.

Container_1397175901787_01_000009 corresponds to a mapper, but it doesn’t follow the established MapReduce task naming convention of “attempt_jobid_m_task-id_attempt-id” in these logs. That ID belongs to the task running inside the container. Hadoop still provides this information, and the best way to retrieve it is starting from the ResourceManager web UI:

Although the application is referenced by an ID, you still don’t know what kind of application it is. However, when you click on the application ID, you discover it is MapReduce, and you can also drill down to the individual tasks from either phase:

One other notable change between MR1 and MR2 has nothing to do with YARN: Upon completion a job, the logs for the job are stored in HDFS and the information about the job is shipped off to a dedicated server called the JobHistory Server. While this fixes a scalability issue in MR1, it does require administrators and developers to become accustomed to obtaining troubleshooting information from a new location.

One thing partners do still find tricky is correlating entries discussing individual containers in YARN logs with individual map or reduce tasks in MR logs. However, this isn’t really a necessary connection to make for most troubleshooting efforts, as you still have plenty of information at your disposal.

5. Live Lock with Concurrent Job Submission

There was one issue that occurred with certification tests running on YARN: a logjam with concurrent job submission. Our certification suite spawns many jobs simultaneously, simulating the workload of a busy “real-world” cluster. The first runs of the certification suite on YARN encountered a funny logjam issue where the jobs would appear as submitted, but none of them would make any progress. The web UI shows lots of jobs in flight or in the “pending” state, but cluster metrics come back idle.

Some investigation revealed that all available resources were allocated to application masters, leaving no room for tasks. The cluster sits in this state pretty much indefinitely. If you observe no progress in your jobs after submitting a bunch of them at the same time, this is probably what’s going on. We’ve filed this issue as a Fair Scheduler bug, but it’s a good condition to be aware of until this has been resolved.

Here’s a snapshot from a web UI of a cluster in this state:

If you see the issue, the resolution is fairly straightforward. You can configure the scheduler to put a limit on the number of concurrent running jobs per node. This number is set in the scheduler configuration, surfaced in Cloudera Manager using the Dynamic Pools editor. By setting this limit to some specific, reasonable value (perhaps somewhere between 2 and 8 depending on cluster capacity), the ResourceManager will leave resources for the task containers in addition to the application masters.

 

6. Killing of Tasks Due to Virtual Memory Usage

In Cloudera Enterprise 5 Beta 2, a few partners reported an issue of tasks being killed due to excessive virtual memory usage. For example, a resource manager log might show something like this:

 

Investigation revealed that this was happening on Centos/RHEL 6 due to its aggressive allocation of virtual memory. Here is a decent discussion of the issue.

In YARN, the workaround was to turn off this enforcement by setting yarn.nodemanager.vmem-check-enabled to false from its default of true. We’ve fixed this default for you in CDH 5.0.0, but if you see this happening sporadically during job executions on a YARN cluster, be sure to check this setting.

Conclusion

Now that it’s production-ready, there are some key benefits to moving to YARN/MR2 now. It solves some key issues with MR1, particularly around JobTracker history and cluster underutilization. Plus, by separating resource allocation from process execution, it opens Hadoop up to different computation frameworks.

I hope this overview saves you some time in your own evaluation of YARN and saves you some time during your migration to MR2!

Jeff Bean has been at Cloudera since 2010, serving on our Solutions Architect Team, Support Team, Training Team, and more recently, Partner Engineering Team. With the release of Cloudera Enterprise 5, Jeff has been helping lots of partners migrate to YARN.

Filed under:

7 Responses
  • tony / May 11, 2014 / 7:49 AM

    where is the virtual memory (5 MB and 1GB) setted? I run one sql in impala shell, but it is killed for virtual memory.the log is:

    2014-05-08 16:19:13,611 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 5909 for container-id container_1399526892133_0003_01_000001: 576 KB of 1 GB physical memory used; 98.5 MB of 1 GB virtual memory used
    2014-05-08 16:19:14,252 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl: Memory usage of ProcessTree 5914 for container-id container_1399526892133_0003_01_000003: 576 KB of 5 MB physical memory used; 98.5 MB of 5 MB virtual memory used

    • Jeff Bean / June 21, 2014 / 10:54 PM

      That looks like the yarn.nodemanager.vmem-check.enabled setting discussed on gotcha #6.

  • abvs / May 23, 2014 / 6:08 AM

    How does ‘mapred.slowstart.completed.maps’ affects the containers allocation for the mappers in your example ?

    • Jeff Bean / May 27, 2014 / 9:54 AM

      mapred.slowstart.completed.maps specifies how many mappers need to finish before resources are allocated for reducers. Setting this number low holds cluster resources aside for reducers and prevents the resources from being used by other tasks. Setting this number high frees the resources for other tasks but might lead to reducers starved for data as the keys and values shuffle-sort over the network.

  • David / June 29, 2014 / 8:30 PM

    Hi Jeff, very useful article. There’s a small (though fatal!) typo: yarn.nodemanager.vmem-check.enabled should be yarn.nodemanager.vmem-check-enabled.

  • Abhi Nellore / August 08, 2014 / 11:27 AM

    Is the formula for number of tasks launched per node from gotcha #1 mentioned here unique to CDH or universal? My StackOverflow question elaborates: http://stackoverflow.com/questions/25193201/how-to-set-the-precise-max-number-of-concurrently-running-tasks-per-node-in-hado .

    • Jeff Bean / September 11, 2014 / 11:30 AM

      Hi Abhi,

      The formula for tasks launched per node is not unique to CDH. Cloudera would always commit changes upstream before making such a change.

Leave a comment


× two = 8