Achieving a 300% speedup in ETL with Apache Spark

Categories: Data Ingestion General Hadoop HDFS Spark

A common design pattern often emerges when teams begin to stitch together existing systems and an EDH cluster: file dumps, typically in a format like CSV, are regularly uploaded to EDH, where they are then unpacked, transformed into optimal query format, and tucked away in HDFS where various EDH components can use them. When these file dumps are large or happen very often, these simple steps can significantly slow down an ingest pipeline. Part of this delay is inevitable; moving large files across the network is time-consuming because of physical limitations and can’t be readily sped up. However, the rest of the basic ingest workflow described above can often be improved.

Let’s establish a simple use case for file processing in EDH: a directory of CSV files exists in hdfs:///user/example/zip_dir/, but has been compressed into raw *.zip files. To make them usable, they need to be extracted and compacted into a single text file which will be placed into hdfs:///user/example/quoteTable_csv/. Since these are CSV files, we’ll assume that each has a simple header located on its first line. One common way to do this is by executing a script similar to the one detailed below on an EDH “edge node” — a node in a cluster which has all of the required configuration files and libraries for applications to interact with the rest of the cluster. Details on the edge node and cluster that we are using for these examples can be found below in the section titled Cluster Details.

 

The following diagram shows the basic flow of this solution, where arrows represent either data being copied into a file on new location. In other words, each arrow between blocks shows a time that the data is copied from the left-hand block to the right-hand block. Purple arrows represent times that computations are performed on the data, and red arrows represent times that the data is simply copied.

Screen Shot 2016-12-06 at 8.15.22 AM

Although this solution is familiar and easy to implement,it’s obvious that there is a bottleneck present. On our example cluster, this script took 125 seconds to complete over zip files containing 10,000,000 records.

A better way

By leveraging Spark for distribution, we can achieve the same results much more quickly and with the same amount of code. By keeping data in HDFS throughout the process, we were able to ingest the same data as before in about 36 seconds. Let’s take a look at Spark code which produced equivalent results as the bash script shown above — note that a more parameterized version of this code code and of all code referenced in this article can be found down below in the Resources section.

This program, submitted to the cluster, is illustrated in the following diagram:

Screen Shot 2016-12-06 at 8.17.11 AM

As shown below, by moving this ingest workload from an edge node script to a Spark application, we saw a significant speed boost — the average time taken to unzip our files on the example cluster decreased by 35.7 seconds, which is equivalent to a speedup of more than 300%. The following graph shows the results of running these two workflows over several different inputs:

Screen Shot 2016-12-06 at 8.17.57 AM

Over the larger dataset, the Spark workflow consistently finished more than 900% faster than the simple bash workflow. Now, we will examine a more complicated workflow that involves processing the files after they’re decompressed. In this workflow, that rows from the compressed *.csv files from hdfs:///user/example/zip_dir/ are to be unzipped and placed into the Impala table quoteTable which is backed by parquet files at hdfs:///user/example/quoteTable/. Additionally, let’s filter out certain rows based on value. Our previous bash script could still be used, alongside a call to Impala to convert the *.csv files to parquet:

Although Impala performs the conversion and filter rather quickly, this common pattern still involves copying the data across HDFS. This problem is illustrated in the following diagram which depicts this new workflow:

Screen Shot 2016-12-06 at 8.19.03 AM

After running the bash script defined above of 138.5 seconds over our dataset. By contrast, we can modify our Spark job — rewritten below with new functionality — from above to achieve the same thing:

Screen Shot 2016-12-06 at 8.19.57 AM

Diagrammed, this program doesn’t look any different than before — the “process” represented is more intensive as it now includes filtering and conversion as well as unzipping, but the data is not written to disk any additional times. As an additional bonus, data that is filtered out is never copied to disk, which is not the case in our previous solution.

This Spark job completed in 64 seconds, over 200% faster than the bash script based solution. On the larger 100M record datasets, our Spark job was over 300% faster. Our cluster’s datanodes only had 2 disks each, and enough cores to support 2 single-core executors each. With significantly more powerful datanodes, Spark’s support for multithreaded writing to parquet will cause it to show even larger gains over Impala for workloads like ours. Even on small cluster, Spark’s performance advantages are clear:

Screen Shot 2016-12-06 at 8.20.50 AM

Once Spark has loaded information into a DataFrame, it’s easy to perform any additional transformations in-memory. In our final example, let’s imagine a more complex pipeline: multiple columns now exist in our dataset, with two quote-enclosed string columns that may contain our delimiter (‘,’), an integer column which needs to be clamped be between -100 and 100, a double column which needs to be squared, and that several simple filters need to be applied. We’ll use the Apache Commons CSV library to easily handle parsing this more complex input. A Spark implementation of this process is shown below:

 

This final test, because it involves writing in more compact datatypes, finished significantly faster than the previous test. Our Spark workflow finished in 52 seconds, and involved significantly less code than the traditional solution, which finished in 148 seconds. The graph below shows runtimes for this example over the same datasets used in previous examples:

Screen Shot 2016-12-06 at 8.21.37 AM

As you can see above, our example ingest workflow was markedly faster than a more intuitive solution which uses bash and Impala — and the difference in speed only gets larger as the size of the input grows. By leveraging Spark to its full potential to concisely perform distributed computations and to execute custom or third-party code in a distributed fashion, our ingest process in the final example was sped up by over 600% when compared to a more obvious implementation. Now that you’ve seen the basics, consider how Spark could speed up your own ETL!

Cluster Details

Hardware: 6 EC2 c3.xlarge nodes

CDH version: 5.8.2

 

Resources

Scripts

Spark

Results  

 

 

 

 

 

 

 

 

 

 

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

7 responses on “Achieving a 300% speedup in ETL with Apache Spark

  1. Gord

    Thanks for the post. The Spark code is a nice example of ETL in Spark.
    A couple of comments:
    1) It sounds like line 4 of the shell script should be “hadoop fs -get” instead of “hadoop distcp”. The distcp command is actually making a copy to a ./zips directory on HDFS, not to a local ./zips directory on the edge node.
    2) The shell script is doing all the process locally and sequentially on the edge node. To make the comparison with Spark meaningful, the cluster should be leveraged for parallelism, e.g. using Pig.

    1. Eric Maynard

      Hi Gord, thanks for the reply.

      You are indeed right about the use of ‘hadoop distcp’ being wrong here. If you examine the scripts linked in the resources section you’ll see that I used ‘hdfs dfs -copyToLocal’ instead in the scripts which actually ran on the cluster. I reckon that the calls to distcp are left over from an older version and that the change in the testing script never made it to the prettier for-show script which is in the article. Good catch!

      As for the performance of the bash scripts you’re very correct in saying that they could benefit from parallelism. Naturally using another framework (e.g. Pig or MapReduce) to parallelize the workload would perform better than the script shown — although I would wager that Spark would still handily outperform these frameworks for the use-case given. In this blog I was not to really attempting to compare various parallelization frameworks but rather to show how one (Spark) can be used to tackle a simple use case without introducing much complexity vs. a simple bash script. I hope its been helpful in that regard.

      1. Gord Arnon

        Sure, the post is helpful in showcasing Spark.
        What I was trying to get to is that the bulk of the “300 %” speedup mentioned in the title is not because Spark, but because the baseline for the comparison is a sequential script.
        So in a sense, the 300% is a bit misleading. Makes for a catchy title though ;-)

  2. Tom Reid

    Great article. I had a question though. If we want to write each unzipped file as a separate file, thats quite straightforward to do so using the local file system. e.g iter.mkString then use PrintWriter. I was hoping to be able to do something similar but using saveAasTextFile to write to HDFS but its not as straightforward to get the data in the iter to an RDD. Any suggestions ?

Leave a Reply

Your email address will not be published. Required fields are marked *