Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle

Categories: Guest Spark

Our thanks to Ilya Ganelin, Senior Data Engineer at Capital One Labs, for the guest post below about his hard-earned lessons from using Spark.

I started using Apache Spark in late 2014, learning it at the same time as I learned Scala, so I had to wrap my head around the various complexities of a new language as well as a new computational framework. This process was a great in-depth introduction to the world of Big Data (I previously worked as an electrical engineer for Boeing), and I very quickly found myself deep in the guts of Spark. The hands-on experience paid off; I now feel extremely comfortable with Spark as my go-to tool for a wide variety of data analytics tasks, but my journey here was no cakewalk.

Capital One’s original use case for Spark was to surface product recommendations for a set of 25 million users and 10 million products, one of the largest datasets available for this type of modeling. Moreover, we had the goal of considering all possible products, specifically to capture the “long tail” (AKA less frequently purchased items). That problem is even harder, since to generate all possible pairings of user and products, you’d have 250e12 combinations—which is more data than you can store in memory, or even on disk. Not only were we ingesting more data than Spark could readily handle, but we also had to step away from the standard use case for Spark (batch processing with RDDs) and actually decompose the process of generating recommendations into an iterative operation.

Learning how to properly configure Spark, and use its powerful API in a way that didn’t cause things to break, taught me a lot about its internals. I also learned that at the time, there really wasn’t a consolidated resource that explained how those pieces fit together. The end goal of Spark is to abstract away those internals so the end-user doesn’t have to worry about them, but at the time I was using it (and to some degree today), to write efficient and functional Spark code you need to know what’s going on under the hood. This blog post is intended to reveal just that: to teach the curious reader what’s happening, and to highlight some simple and tangible lessons for writing better Spark programs.

Note: this post is not intended as a ground-zero introduction. Rather, the reader should have some familiarity with Spark’s execution model and the basics of Spark’s API.

The Pieces

First, let’s review the key players in the Spark infrastructure. I won’t touch on all the components, but there are a few basics that I’ll cover. These are partitioning, caching, serialization, and the shuffle operation.

Partitions

Spark’s basic abstraction is the Resilient Distributed Dataset, or RDD. The RDD is how Spark simplifies complex operations like join or groupBy and hides the fact that under the hood, you’re dealing with fragmented data. That fragmentation is what enables Spark to execute in parallel, and the level of fragmentation is a function of the number of partitions of your RDD. The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. Furthermore, since with fewer partitions there’s more data in each partition, you increase the memory pressure on your program. On the flip side, with too many partitions, your performance may degrade as you take a greater hit from network and disk I/O. Ultimately this concept ties into Spark’s notion of parallelism and how you can tune it (see the discussion of tuning parallelism here) to optimize performance.

Caching

Spark is a big deal for two main reasons: first, as already mentioned, it has a really simple and useful API for complex processes. The other big thing is that unlike your standard MapReduce program, Spark lets you cache intermediate results in memory. Caching can dramatically improve performance if you have data structures that are used frequently, such as a lookup table or a matrix of scores in a machine-learning algorithm. Caching can also introduce problems since it will often require huge chunks of memory; tuning this process is its own challenge, but doing so can increase performance by several orders of magnitude.

Serialization

In distributed computing, you generally want to avoid writing data back and forth because it’s expensive. Instead, the common paradigm is to bring your code to your data. This is why many frameworks are based on the JVM, which lets you execute code on the same machine as the data. Serialization is the process of translating this code into an ideally compressed format for efficient transfer over the network. By default, Spark uses the standard Java serializer. However, you can get much faster and more memory-efficient serialization using Kryo serialization. Switching to Kryo can reduce memory pressure on your cluster and improve stability.

Shuffle

Even though moving data is expensive, sometimes it’s necessary. For example, certain operations need to consolidate data on a single node so that it can be co-located in memory, such as when you want to perform a reduce operation across all values associated with a particular key of a key-value RDD (reduceByKey()). This expensive reorganization of data is known as the shuffle. The shuffle involves serialization as well as Akka, Spark’s internal messaging system, thereby consuming disk and network I/O while increasing memory pressure from garbage collection. Improperly configured Akka settings or serialization settings can cause problems depending on the size of your data. There is an excellent write-up of what happens during a shuffle in this Cloudera Engineering blog post.

Lessons

Next, let’s delve into how these components all come together when you write a Spark program and, specifically, what lessons I’ve learned about tying all these pieces together.

Lesson 1: Spark Gorges on Memory

As I mentioned previously, part of Spark’s power is its ability to cache things in memory. The downside of this amazing utility is that when you use it, Spark transforms into a total memory hog. First, the JVM and YARN (if you’re using it) consume a significant amount of memory, leaving less than you expect for data movement and caching. Next, there’s metadata that accumulates on the driver as a byproduct of shuffle operations and becomes particularly problematic during long-running jobs (multi-day). Finally, Java or Scala classes may introduce hidden overhead in your RDDs. A 10-character Java string may actually consume as much as 60 bytes! To pour salt in the wound, actually tracking down the source of a problem can be nearly impossible since a Spark program may have logs distributed across the cluster, have hundreds of tasks executing per second, and errors may not always propagate all the way up the stack when exceptions are thrown.

Thus, the first thing to do is to tame this unruly beast. For starters, it’s critical to partition wisely, to manage memory pressure as well as to ensure complete resource utilization. Next, you must always know your data—size, types, and how it’s distributed. This last bit is important since otherwise you may wind up with skewed distribution of data across partitions. A simple solution for this last problem is to use a custom partitioner. Last, as mentioned above, Kryo serialization is faster and more efficient.

To deal with the issue of accumulating metadata, you have two options. First, you can set the spark.cleaner.ttl parameter to trigger automatic cleanups. However, this will also wipe out any persisted RDDs and I found that it caused problems when trying to subsequently interact with HDFS. The other solution, which I ended up implementing in my case, is to simply split long-running jobs into batches and write intermediate results to disk. This way, you have a fresh environment for every batch and don’t have to worry about metadata build-up.

Lesson 2: Avoid Data Movement

In general, I found that avoiding shuffles and minimizing data transfers helped me write programs that ran faster and executed more reliability. Keep in mind: there are occasional cases when having an extra shuffle can help, such as when you have data that can’t be automatically partitioned into many partitions (see “When More Shuffles are Better” here.)

So, how does one avoid shipping data? The obvious answer is to avoid operations that trigger shuffles like repartition and coalesceByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Spark also provides two mechanisms in particular that help us here. Broadcast variables are read-only variables that are cached in-memory locally on each machine and eliminate the need to ship copies of it with every task. Using broadcast variables can also let you do efficient joins between large and small RDDs or store a lookup table in memory that provides more efficient retrieval than doing an RDD lookup().

Accumulators are a way to efficiently update a variable in parallel during execution. Accumulators differ from broadcast variables in that they may only be read from on the driver process, but they allow Spark programs to efficiently aggregate results such counters, sums, or generated lists. An important note about Accumulators is that they’re not limited to basic types; you can accumulate any Accumulable classes.

Lesson 3: Avoiding Data Movement is Hard

The challenge of using the above mechanisms is that, for example, to broadcast an RDD you need to first collect() it on the driver node. To accumulate the results of distributed execution you need to serialize that data back to the driver and aggregate it there. The upshot of this is that all of a sudden you’re increasing the memory pressure on the driver. Between collected RDDs, the persistent metadata discussed previously, and Accumulators, you may quickly run out of memory on the driver. You can, of course, increase the amount of memory allocated by setting spark.driver.memory, but that only works to a degree.

Above, I mentioned a few things: how a smaller number of partitions increases the size footprint of each partition, and how Spark uses Akka for messaging. If your 2GB RDD is only partitioned into 20 partitions, to serialize the data from one partition to another node, you’ll need to ship a 100MB chunk of data from each partition via Akka. But, by default, Akka’s buffer is only 10 MB! One can get around this by setting akka.frameSize, but this is another example of needing to fundamentally understand your data and how all these moving pieces come together.

Lesson 4: Speed!

The first three lessons deal primarily with stability. Next, I want to briefly describe how I got dramatic performance gains.

The first is fairly obvious: liberal use of caching. Of course, you don’t have infinite memory so you must cache wisely, but in general, if you use some data twice, cache it. Also, unused RDDs that go out of scope will be automatically un-persisted but they may be explicitly released with unpersist().

Second, I found broadcast variables extremely useful. I use them regularly for large maps and lookup tables. They have a significant advantage over an RDD that is cached in memory since a lookup in an RDD, even one cached in memory, will still be O(m), where m is the length of data in a single partition. In contrast, a broadcasted hash map will have a lookup of O(1).

Finally, there are times when I had to help Spark parallelize its execution. In my case, I needed to perform a per-key operation between two RDDs—that is, select a key-value pair from the first and perform some computation with that data against the second RDD. In a naive implementation, Spark would process these keys one at a time, so I was of course getting terrible resource utilization on my cluster. I’d have only a few tasks running at a time and most of my executors weren’t used. Because I wasn’t using the RDD API to run this code, I couldn’t benefit from Spark’s built-in parallelism. The ultimate solution was simple: create a thread pool on the driver that would process multiple keys at the same time. Each of the threads would generate tasks that were submitted to YARN and now I could easily ramp up my CPU utilization.

The final lesson here is that Spark can only help you so long as you help it to do so.

Conclusion

Ideally, the above concept breakdown and lessons (coupled with the provided documentation links) help clarify some of Spark’s underlying complexity. I’ve made liberal citations to the sources that helped me most when it came to understanding all these various mechanisms, as well as those that guided me through learning how to configure and use Spark’s more advanced features. I simply hope you find them as useful as I have!

Ilya is a roboticist turned data scientist. After a few years at the University of Michigan building self-discovering robots and another few years working on core DSP software with cell phones and radios at Boeing, he landed in the world of big data at Capital One Labs. He’s an active contributor to the core components of Spark with the goal of learning what it takes to build a next-generation distributed computing platform.

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

16 responses on “Working with Apache Spark: Or, How I Learned to Stop Worrying and Love the Shuffle

  1. Subba Rao

    Excellent write up. For streaming use cases which can run for months any ideas on how to go about updating a large reference broadcast variable?

    Have you seen what is the performance trade off between using broadcast variables and in-mem db like redis/voltdb etc.

    1. devendra

      I have pondered on this problem for a long time. I think its better to avoid BV for real time processing and wrather go for a DB like redis. It might have some network tradeoffs but they are reliable and u can avoid any downtime.

  2. Francis

    “The number of partitions is important because a stage in Spark will operate on one partition at a time (and load the data in that partition into memory). Consequently, if you have fewer partitions than active stages, you will wind up under-utilizing your cluster. ” – There is something here I don’t quite really understand.

    A Task is basically a unit of execution that runs on a single machine. A Stage is a group of tasks, based on partitions which will perform the same computation in parallel, right ? Then, why do you say that “a stage in Spark will operate on one partition at a time” ? What is the impact of having fewer partitions than active stages ?

    Thank you for your answer.

    1. Ilya Ganelin

      Francis – you’re right that the statement is unclear // incorrect. In Spark, you can have one task per partition, but a stage may operate on many partitions simultaneously., The point I was trying to make was that it’s important to manage the number of partitions vs. the number of active executors, which are assigned tasks. Each executor can only run one task at a time and that task reads from a single partition. Consequently, if you have fewer partitions than available tasks, you will have idle executors and underutilize your cluster resources. Generally, the recommendation is to have 2-3 partitions per executor (and therefore active tasks) (and to increase this ratio for larger datasets).

      The number of tasks actually assigned per stage is a function of the partitioning for the RDD, which in turn is a function of the parallelism settings for Spark – either the default.parallelism setting or the optional parameter for certain functions.

  3. Tudor-Lucian Lapusan

    Hi Justin,
    Very nice article !
    Just right now I’m ‘fighting’ with Spark :) It throws me a very weird exception : MyClass cannot be cast to java.lang.Long (where MyClass is a POJO). I’m using Kryo for serialization.
    The solution to it was to adjust the number of partitions for my RDDs, but still Spark throw my such a meaningless exception :).

    I really like your final conclusion “The final lesson here is that Spark can only help you so long as you help it to do so.”. I’m totally agree with you.
    Using Spark API and operators is very nice and the development speed is very good, BUT to run the application on an big input and to make it scalable and reliable it’s hard. You really need to understand deeply how Spark works.
    For Hadoop MapReduce version, it’s harder to learn, the speed of development is not so fast, BUT it works on big input data without the need to understand deeply how it works.

    I’m pretty sure that Spark will be more scalable and reliable in short time without the need to understand deeply how it works ;)

  4. Reader

    Hi, thanks for the article!
    You mentioned that “Because I wasn’t using the RDD API to run this code, I couldn’t benefit from Spark’s built-in parallelism.”, I’m curious if other Spark APIs, especially the DataFrame API, are limited in respect to parallelism (or others).

    I’m currently translating a PostgreSQL script that executes in 30 seconds and creates a lot of temp tables along its execution; the same code adapted to SparkSQL not only runs slower but crashes with memory leaks after a minute or so…

    Thanks,

  5. Kamran Saiyed

    Great writeup. You mentioned for parallelism and performance: ” The ultimate solution was simple: create a thread pool on the driver that would process multiple keys at the same time. Each of the threads would generate tasks that were submitted to YARN and now I could easily ramp up my CPU utilization”. Can you please provide an example of how you created such a thread pool?

  6. Elad Amit

    “there’s metadata that accumulates on the driver as a byproduct of shuffle operations and becomes particularly problematic during long-running jobs (multi-day)”
    can you point me at the cause / an open issue-bug / a solution for this?

  7. s

    Great writeup. You mentioned for parallelism and performance: ” The ultimate solution was simple: create a thread pool on the driver that would process multiple keys at the same time. Each of the threads would generate tasks that were submitted to YARN and now I could easily ramp up my CPU utilization”. Can you please provide an example of how you created such a thread pool?

  8. Tapas Kumar Mahanta

    Hi , could you please elaborate on this – “because a stage in Spark will operate on one partition at a time”. Because my understanding is that In Spark In a “Stage” multiple parallel tasks are performed on multiple partitions of RDD. That any Map or Reduce operation can be called a stage .

    1. Amitabh

      Hi Tapas,
      yes, you are right! A “Stage” contains multiple parallel tasks, each task operates on a single partition of an RDD. Also, a single stage can have multiple transformations in it, until unless a transformation requires re-partitioning. for example:
      sc.textFile(“..”).map(..).filter(..).count() —-> these transformations will be inside one stage.
      However, sc.textFile(“..”).map(..).reduceByKey().filter(..).count() —> here , no. of stages = 2. The stage split point is “reduceByKey()”. Because, it triggers re- partitioning.

  9. Amit Kharb

    I am trying to use Spark in multi user environment. Multiple users can submit jobs to Spark at the same time. I have noticed that even the scheduler is set to “FAIR”, for a long job, it almost behaves sequentially.

    Another thing I noticed that when we leave spark running for few days, there is performance degradation. Executors take longer time to finish same job. My doubt is because of build up of metadata.

    Do you have any example of creating a thread pool on the Driver side.

    Thanks

    1. Justin Kestelyn Post author

      Amit,

      Please post this question in the “Advanced Analytics (Spark)” area at community.cloudera.com.

  10. David

    Hi,
    thank you for your post! I have a question to Lesson 1. You say that you split long lasting jobs into batches and write intermediate results on disk. Do you mean batches within the same spark context? Or do you call sc.stop() and start with fresh one?

    Have a great time!
    David

  11. Joe

    The amount of products in this article does not make sense. 10 million? Dont think there are 10 million diverse products in a grocery store. Except if product here is seen as something else. Perhaps you can clarify?

  12. sasanka ghosh

    Do not understand why anyone in his or her right sense will use Spark except unstructured data ,some pre processing or for historical data processing

    1. It needs memory (costly),network bandwidth and a bad data model to perform . Any MPP system from big players will outperform spark and with cloud Licensing costs are going down .
    2. With Big players you can have a better data model not a wide denormalized limited functionality table.
    3. With Big players instead of most of the time worrying abt memory leak , parallel ,logs , scheduling , system utilization etc . one can concentrate on the data, data model,functionality to be delivered .
    4. The amount of time and resource needed to develop , maintain etc. far outweigh the mental piece and confidence that any Big MPP/In Memory player can give .

    5. For big data processing OOPS concept is not needed . Unnecessary a hotch potch of OOPs and Functional style is incorporated and to make it worse in the name of all kinds of syntax sugar all kinds of short hands and confusion introduced.
    6. One more thing if anyone can educate me if u r processing say 100 GB data with 10 Million JSON objects or 10 million rows of a flat table , or any stream of log data or say a star schema where the OOPs concept will fit i.e. Inheritance,Interface,Traits ,Abstract Class etc.
    &. Proc ,PL/SQL etc under the hood uses C,Java etc and does very complex work i.e.. allows to concentrate on the functionality. someone should have worked on that