Cloudera and Intel engineers are collaborating to make Spark’s shuffle process more scalable and reliable. Here are the details about the approach’s design.
What separates computation engines like MapReduce and Apache Spark (the next-generation data processing engine for Apache Hadoop) from embarrassingly parallel systems is their support for “all-to-all” operations. As distributed engines, MapReduce and Spark operate on sub-slices of a dataset partitioned across the cluster. Many operations process single data-points at a time and can be carried out fully within each partition. All-to-all operations must consider the dataset as a whole; the contents of each output record can depend on records that come from many different partitions. In Spark,
reduceByKey are popular examples of these types of operations.
In these distributed computation engines, the “shuffle” refers to the repartitioning and aggregation of data during an all-to-all operation. Understandably, most performance, scalability, and reliability issues that we observe in production Spark deployments occur within the shuffle.
Cloudera and Intel engineers are collaborating on work to augment Spark’s shuffle so that it can handle large datasets more quickly and more reliably. While Spark has advantages over MapReduce in many respects, it is still catching up on scalability and reliability. We’ve borrowed concepts from the battle-tested MapReduce shuffle implementation to improve shuffle operations that output sorted data.
In this post, we’ll survey the workings of the current Spark shuffle implementation, our proposed changes, and how they improve its performance. Work is in progress upstream at SPARK-2926.
Current State of Affairs
A shuffle involves two sets of tasks: tasks from the stage producing the shuffle data and tasks from the stage consuming it. For historical reasons, the tasks writing out shuffle data are known as “map task” and the tasks reading the shuffle data are known as “reduce tasks.” These roles are with respect to a particular shuffle within a job. A task might be a reduce task in one shuffle where it’s reading data, and then a map task for the next shuffle, where it’s writing data to be consumed by a subsequent stage.
The MapReduce and Spark shuffles use a “pull” model. Every map task writes out data to local disk, and then the reduce tasks make remote requests to fetch that data. As shuffles sit underneath all-to-all operations, any map task may have some set of records that are meant for any reduce task. The job of the map side of the shuffle is to write out records in such a way that all records headed for the same reduce task are grouped next to each other for easy fetching.
Spark’s original shuffle (“hash-based shuffle”) implementation accomplished this goal by opening a file in each map task for each reduce task. This approach has a simplicity advantage but runs into a few issues. For example, Spark must either use lots of memory holding a buffer over each file or incur lots of random disk I/O. Furthermore, if M and R are the number of map and reduce tasks in a shuffle, hash-based shuffle requires a total of M * R intermediate files. Shuffle consolidation work reduced this to C * R intermediate files, where C is the number of map tasks that can run at the same time. But even with this change, users would often run into the “Too many open files” ulimit when running jobs with non-trivial numbers of reducers.
Single map task in hash-based shuffle
Single map task in sort-based shuffle
To further improve the scalability and performance of shuffle, starting with release 1.1, Spark introduced a “sort-based shuffle” implementation that is similar to the map-side approach used by MapReduce. In this implementation, the map output records from each task are kept in memory until they can’t fit. At that point they are sorted by the reduce task for which they are destined and then spilled to a single file. If this process occurs multiple times within a task, the spilled segments are merged later.
On the reduce side, a set of threads are responsible for fetching the remote map output blocks. As each block comes in, its records are deserialized and passed into a data structure appropriate to the all-to-all operation being carried out. For aggregation operations like
aggregateByKey, the records are passed into an
ExternalAppendOnlyMap, which is essentially a hash map that can spill to disk when it overflows memory. For ordering operations like
sortByKey, records are passed into an
ExternalSorter, which sorts them, possibly spilling to disk, and returns an iterator over them in sorted order.
Full Sort-based Shuffle
There are two disadvantages to the approaches described above:
- Each Spark reduce task holds many deserialized records in memory at once. Large numbers of Java objects put pressure the JVM’s garbage collection that can lead to slowdowns and pauses. They also take up more memory than their serialized versions, meaning that Spark must spill earlier and more often, incurring more disk I/O. Furthermore, it’s difficult to determine memory footprint of deserialized objects with 100% accuracy, so holding more of them opens up more possibilities for out-of-memory errors.
- When conducting an operation that requires sorting the records within partitions, we end up sorting the same data twice: first by partition in the mapper, and then by key in the reducer.
Our change sorts the records by key within the partitions on the map side as well. Then, on the reduce side, we need only to merge the sorted blocks coming in from each map task. We can store the blocks in memory in serialized form and deserialize a record at a time as we merge. Thus the maximum number of deserialized records in memory at any time is the number of blocks we’re merging together.
Single map task in full sort-based shuffle
A single reduce tasks can receive blocks from thousands of map tasks. To make this many-way merge efficient, especially in the case where the data does not fit in memory, we introduce a tiered merger. When we need to merge many on-disk blocks, the tiered merger conducts merges on subsets of the blocks to minimize disk seeking. This tiered merge is applicable to the merge steps inside
ExternalSorter as well, but we haven’t yet modified them to take advantage of it.
For each task, a set of threads are responsible for concurrently fetching shuffle data. A per-task memory pool of 48MB is used as a landing spot for fetched data.
SortShuffleReader we are introducing is responsible for taking blocks from there and exposing an iterator over [Key, Value] pairs to the user code.
Spark maintains a main shuffle memory zone shared across all tasks whose default size is 20% of the full executor heap. As blocks come in, the
SortShuffleReader tries to acquire shuffle memory from this main zone for them. Serialized blocks fill up in memory until an attempt to acquire memory fails. At this point, we needs to spill data to disk to free up space. The
SortShuffleReader merges all (well, not actually all; sometimes it’s better to only spill a few) in-memory blocks into a single sorted on-disk file. As blocks pile up on disk, a background thread monitors them and merges them into larger sorted on-disk blocks if necessary. The “final merge” that feeds the Iterator passed to user code merges the final set of on-disk blocks with any blocks remaining in memory.
How do we decide when an intermediate disk-to-disk merge is necessary? The
spark.shuffle.maxMergeFactor property (defaulting to 100) controls the maximum number of on-disk blocks that may be merged at once. When the number of on-disk blocks exceeds this limit the background thread runs a merge to bring this number down (but not immediately; more details in the code). In deciding how many blocks to merge, the thread first minimizes the number of merges it carries out, and, within that number, tries to merge as few blocks as possible. Consequently, if
spark.shuffle.maxMergeFactor is 100 and the final number of on-disk blocks is 110, it only merges 11 blocks together, which puts the final number of on-disk blocks at exactly 100. Merging any fewer blocks would require an additional merge, and merging any more blocks would result in unnecessary disk I/O.
Tiered merge with maxMergeWidth=4. Each rectangle is an on-disk segment. Three segments are merged into a single segment and then the final four segments are merged into the iterator that’s fed to the next operation.
Performance Comparison with sortByKey
We tested using SparkPerf‘s sort-by-key workload to assess the performance impact of our change. We choose two different size datasets to compare the performance gain of our change when memory is sufficient to hold all the shuffle data or or not.
sortByKey transformation results in two jobs and three stages.
- Sample stage: Sample the data to create a range-partitioner that will result in an even partitioning.
- “Map” stage: Write the data to the destined shuffle bucket for reduce stage.
- “Reduce” stage: Get the related shuffle output and merge/sort on the specific partition of dataset.
The benchmarks were conducted on a 6-node cluster. Each executor had 24 cores and 36GB of memory. The large dataset had 20 billion records, which, compressed, occupied 409.8GB on HDFS. The small dataset had 2 billion records, which, compressed, occupied 15.9GB on HDFS. Each record was a key-value pair of two 10-character strings. The sort was conducted over 1000 partitions in both cases. The charts of running time with each stage and total jobs are shown below:
Large dataset (lower is better)
Small dataset (lower is better)
The sample stage time remains the same because it doesn’t involve a shuffle. In the map stage, because our improvement now sorts data by key within each partition, the stage running time increases (by 37% for the large dataset and 27% for the small dataset). However, the extra time is more than compensated for in the reduce stage, which now only needs to merge the sorted data. The duration of the reduce stage is reduced by over 66% on both of the datasets, leading to a 27% total speedup on the large dataset and 17% total speedup on the small dataset.
SPARK-2926 is one of a few planned improvements to Spark’s shuffle machinery. In particular, there are many ways that the shuffle can manage its memory better:
- SPARK-4550 tracks storing memory-buffered map output data as raw bytes instead of Java objects. This work will allow map output data to take up less space and thus result in fewer spills, as well as faster comparisons on raw bytes.
- SPARK-4452 tracks allocating memory more carefully between different shuffle data structures that make use of it, as well as giving no-longer needed memory back sooner.
- SPARK-3461 tracks streaming over the values that correspond to a particular key passing on the results of a
groupByor join instead of loading them all into memory at once.
Sandy Ryza is a Data Scientist at Cloudera, an Apache Spark committer, and an Apache Hadoop PMC member. He is a co-author of the O’Reilly Media book, Advanced Analytics with Spark.
Saisai (Jerry) Shao is a Software Engineer at Intel and a Spark contributor.