# Do the Schimmy: Efficient Large-Scale Graph Analysis with Apache Hadoop, Part 2

- by Jon Zuanich (@jonzuanich)
- November 18, 2010
- 2 comments

*Continued Guest Post from* Michael Schatz *and* Jimmy Lin

**Part 2: Efficient Graph Analysis in Hadoop with Schimmy**

In part 1, we looked at how extremely large graphs can be represented and analyzed in Apache Hadoop/MapReduce. Here in part 2 we will examine this design in more depth to identify inefficiencies, and present some simple solutions that can be applied to many Hadoop/MapReduce graph algorithms. The speedup using these techniques is substantial: as a prototypical example, we were able to reduce the running time of PageRank on a webgraph with 50.2 million vertices and 1.4 billion edges by as much as 69% on a small 20-core Hadoop cluster at the University of Maryland (full details available here). We expect that similar levels of improvement will carry over to many of the other problems we discussed before (the Kevin Bacon game, and DNA sequence assembly in particular).

As explained in part 1, when processing graphs in MapReduce, the mapper emits “messages” from vertices to their neighbors, and the shuffle phase effectively routes each message to the proper destination vertex. For computationally trivial problems such as PageRank, the running time is dominated by shuffling data across the network, so anything we can do to reduce that data will also decrease the running time. Fortunately, in many cases we can reduce the data volume by combining multiple messages destined for the same vertex into a single message using a combiner – remember, a combiner is a “mini-reducer” that executes a function on a subset of the values with the same key. Because the subset may be unstable from run to run, combiners can only be safely used if the function is associative and commutative – that is, when the order that values are processed doesn’t change the result.

For computations like PageRank, where the reducer merely sums the values received from neighboring vertices, we can safely sum values in any order, including summing values in a combiner before the messages are even sent across the network. If there are many messages destined for the same vertex emitted from mappers on a single machine, the combiner can replace all those messages with just one, and save substantial network communication costs (without changing the final result). Using combiners is a standard best practice for algorithms in MapReduce (graphs and otherwise), and in our experiments it improved the running time of PageRank by 18%.

As a novel twist, we also evaluated a technique called “in-mapper combiners” instead of generic combiners. The idea is instead of having the mapper immediately emit messages, it first buffers messages inside a mapper-local hash table stored in memory. If we get lucky, the mapper will create many messages destined to the same vertices right after each other, and we can immediately update the combined values in memory, without having to pay the costs to serialize, write, sort, read, and deserialize the individual messages from disk. Although, we have to periodically flush the in-mapper table so we don’t overflow the memory available on the machine. You can see where the name of this technique comes from: in essence, we are doing the combining right inside the mapper! In our experiments, using in-mapper combiners further improved performance by another 16% beyond the generic combiner.

If you have a commutative and associative computation, combiners will almost always help the performance of your algorithm. The magnitude of the improvement (18% vs. 5% vs. 34%), though, will very much depend on your cluster environment and the distribution of the graph in that environment. In the worst case, none of the vertices stored on a single machine share a common neighbor, so the combiner (or in-mapper combiner) won’t help at all. What we would like is all neighboring vertices (or all tightly connected vertices) are stored on the same machine so that all their messages can be efficiently combined. Unfortunately, optimally distributing the graph like this is a hard clustering problem in itself, but in some cases we can use simple heuristics that are very effective.

The default method for assigning vertices to machines (both when the graph is stored in HDFS and during a MapReduce job) is a more-or-less random function called the HashPartitioner, which computes the hash value of the vertex id (key) to partition the graph. As a random function, this approach works well to balance the number of vertices processed and stored on each machine. Furthermore, if the graph is roughly uniformly connected, then there is a good chance that some neighboring vertices will be processed together, which will then benefit from the combiner. However, many real world networks have higher-order structure that this approach won’t capture. For example, the Web is organized into domains, and webpages in the same domain generally have many more links to each other than to other remote corners of the Web. The default HashPartitioner is totally blind to this structure, which means the combiner will lose opportunities to aggregate partial results.

We could exploit some of the network structure if we could only assign graph vertices to machines in blocks by domain instead of randomly. Surprisingly, this level of control is relatively easy to implement: instead of referencing vertices by some arbitrary vertex id, we instead use consecutive numbers derived from some attribute of the vertices. For example, we could preprocess the webgraph and alphabetically sort all the URLs. We then renumber vertices based on the sort order (i.e. http://aaa.com/index.html is id 1, http://aaa.com/welcome.html is id 2, http://abc.com/index.com id 3, etc). By virtue of the sorting, pages from the same domain will form blocks of consecutive ids.

The only remaining challenge is to partition the vertices using ranges instead of hash values. This can be achieved using a variant of the RangePartitioner used for the Terasort challenge, where Hadoop/MapReduce is used to globally sort a list of values by partitioning the space of keys. Conceptually, webpages 1-100,000 are processed together, webpages 100,001 – 200,000 are processed together, and so forth (the actual size of the blocks will vary). This way, each web domain will be assigned to a single or perhaps a few machines and thus the combiner will be much more effective. If the range boundaries don’t exactly coincide with the domain boundaries, it causes a small bit of missed locality, but overall using a RangePartitioner leads to a huge improvement in performance over the HashPartitioner: in our experiments, a 40% improvement in running time of PageRank on our webgraph just by renumbering and repartitioning the graph!

The final inefficiency we explored was if there was a way to more effectively distribute the graph structure within an iterative MapReduce graph algorithm. Remember in the standard design, the mapper emits messages for neighboring vertices, and also reemits the graph vertices themselves so that they will be shuffled together to the same reducer. This ensures the graph structure is available in the right reducer, but is not efficient. For example, if we store many attributes in the graph vertices (links to neighbors, text of the webpage, embedded images, date of collection, etc), and only distribute messages to a small number of neighbors (perhaps just from vertices with a certain keyword), the vast majority of data emitted and shuffled will be the graph structure with hardly any computationally meaningful data exchanged.

This spring, we asked ourselves if it was truly necessary to do so, especially when the graph structure does not change at all between iterations in PageRank. As a result of this discussion, we came up with a new technique called Schimmy (hint: this is how Schatz + Jimmy think about graphs) that separates the messages (mutable-data flow) from the graph structure (immutable-data flow). The two key observations that make it work are 1) the partitioning of vertices (assignment of vertices to machines) is stable across MapReduce iterations so that conceptually once a vertex is assigned to machine *X*, it is always processed on machine *X*, and 2) it is possible to merge sort intermediate key-value pairs with “side data” in the reducer. By default, MapReduce only sorts key-value pairs emitted by the mappers, but if we are willing to go a little outside the standard APIs, we can merge together messages and vertices from separate sorted data streams.

In Schimmy, the mapper iterates over the input vertices more-or-less as before but only emits messages and not the vertex tuples (i.e., graph structure). The messages are then combined and shuffled as before to route them to the proper destination machines. Then, at the last possible moment, the reducer code merges together the shuffled messages with the vertices (the same files that the mappers processed). The reducer then computes the new values for each vertex, and stores the final updated graph as before. If you’re a database guru, you might describe this as a reduce-side parallel merge join between vertices and messages.

In essence, Schimmy short-circuits reshuffling the vertices because the vertices get shuffled the same way in every iteration. This slight change saves substantial time, between 10% and 20% of each iteration in our prototypical analysis of PageRank, leading to an overall saving of 69% when used in conjunction with RangePartitioning and in-mapper combining over the previous best practice of using a combiner. In some ways, though, PageRank is the worst case for Schimmy in that PageRank sends messages from every vertex to every neighbor in every iteration. In less extreme algorithms (such as the first round of the Kevin Bacon game), where only a small fraction of vertices send messages to their neighbors, Schimmy should be even more effective.

In conclusion, Schimmy and in-mapper combiners make graph algorithms faster because they reduce the total amount of computation and the total volume of data to exchange. In-mapper combining is also effective because RAM is orders of magnitude faster than disk and because it can then completely skip the work of serializing & deserializing intermediate key-value pairs. RangePartitioning is effective because it can drastically improve locality, leading to more opportunities for local aggregation (and hence less data to shuffle across the network). These ideas are widely applicable (and perhaps obvious in retrospect), but are often overlooked in distributed environments where the initial temptation may be to simply throw more machines at the problem. While a brute force approach makes sense for exploratory data analysis, once we know what we need, it pays to refine the algorithm by exploiting locality and reducing data volumes, especially at the slowest levels of the storage hierarchy. This is simply good computer science!

We hope you have found these articles interesting! For a more in depth discussion of these techniques, please see the Schimmy paper referenced above and the Schimmy reference implementation in Cloud9, which is a library developed at the University of Maryland designed to serve as both a teaching tool and to support research in data-intensive text processing.

Note, for clarity we have glossed over the details of how tasks and file splits are scheduled on worker machines. In particular, file splits are arbitrarily scheduled on different machines in different iterations (the assignment of tasks is unstable), but reduce tasks can peek inside the splits to simulate a stable scheduling. To all the Hadoop hackers out there – we would love to work with you to develop a stable task scheduler that could be used to further cut unnecessary network traffic and improve performance. There are some significant technical challenges to make this approach work, especially to do so while ensuring reliability, but such an addition would enable some advanced techniques that are currently not possible. Ideally, this would lead to the development of a premier open-source large-scale graph analysis system, built directly on top of Hadoop MapReduce. Perhaps it will eventually prove superior to Google’s latest closed-source graph processing system Pregel.

## Filed under:

2 ResponsesAkshay Bhat / November 23, 2010 / 6:44 PMIt would be interesting to see how it stacks up against Pegasus from CMU.