Our thanks to Mayur Rustagi (@mayur_rustagi), CTO at Sigmoid Analytics, for allowing us to re-publish his post about the Spork (Pig-on-Spark) project below. (Related: Read about the ongoing upstream to bring Spark-based data processing to Hive here.)
Analysts can talk about data insights all day (and night), but the reality is that 70% of all data analyst time goes into data processing and not analysis. At Sigmoid Analytics, we want to streamline this data processing pipeline so that analysts can truly focus on value generation and not data preparation.
We focus our efforts on three simple initiatives:
- Make data processing more powerful
- Make data processing more simple
- Make data processing 100x faster than before
As a data mashing platform, the first key initiative is to combine the power and simplicity of Apache Pig on Apache Spark, making existing ETL pipelines 100x faster than before. We do that via a unique mix of our operator toolkit, called DataDoctor, and Spark.
DataDoctor is a high-level operator DSL on top of Spark. It has frameworks for no-symmetrical joins, sorting, grouping, and embedding native Spark functions. It hides a lot of complexity and makes it simple to implement data operators used in applications like Pig and Apache Hive on Spark.
For the uninitiated, Spark is open source Big Data infrastructure that enables distributed fault-tolerant in-memory computation. As the kernel for the distributed computation, it empowers developers to write testable, readable, and powerful Big Data applications in a number of languages including Python, Java, and Scala.
How Can I Get Started?
As a user of Apache Pig, the migration effort starts and ends with
pig -x spark
All your existing UDF, Pig scripts, and data loaders will work out of the box on Spark — which means you can write simpler, easier-to-develop-and-manage data pipelines on Spark. The Pig REPL is a simple way to speed up your data processing on Spark without any coding, compiling, or development effort. What’s more, you have thousands of Pig UDFs to choose from and bootstrap your ETL process on Spark.
Pig operates in a similar manner to Big Data applications like Hive and Cascading. It has a query language quite akin to SQL that allows analysts and developers to design and write data flow. The query language is translated into a “logical plan” that is is further translated into a “physical plan” containing operators. Those operators are then run on the designated execution engine (MapReduce, Apache Tez, and now Spark). There are a whole bunch of details around tracking progress, handling errors, and so on that I will skip here.
Query planning on Spark will vary significantly from MapReduce, as Spark handles data wrangling in a much more optimized way. Further query planning can benefit greatly from ongoing effort on Catalyst inside Spark. At this moment, we have simply introduced a SparkPlanner that will undertake the conversion from logical to physical plan for Pig. Databricks is working actively to enable Catalyst to handle much of the operator optimizations that will plug into SparkPlanner in the near future. Longer term, we plan to rely on Spark itself for logical plan generation. An early version of this integration has been prototyped in partnership with Databricks.
Pig Core hands off Spark execution to SparkLauncher with the physical plan. SparkLauncher creates a
SparkContext providing all the Pig dependency jars and Pig itself.
SparkLauncher gets a MR plan object created from the physical plan. At this point, we override all the Pig operators to DataDoctor operators recursively in the whole plan. Two iterations are performed over the plan — one which looks at the store operations and recursively travels down the execution tree, and a second iteration that does a breadth-first traversal over the plan and calls
convert on each of the operators.
The base class of convertors in DataDoctor is
POConverter class and defines the abstract method
convert, which is called during plan execution.
LoadOperator: An RDD is created for the data that can be used for subsequent transformations.
LoadConverterhelps load data from HDFS using the Spark API with parameters initialized from
StoreOperator: This operator is useful for saving the end results or some intermediate data whenever required.
StoreConverteris used to save data to HDFS with parameters from
- Local rearrange:
LocalRearrangeConverterdirectly passes data to
POLocalRearrangeConverter, which in turn transforms data into the required format. This happens through the Spark
mapAPI. The local rearrange operator is a part of the
COGROUPimplementation. It has an embedded physical plan that generates tuples of the form
(grpKey,(indxed inp Tuple))
- Global rearrange:
GlobalRearrangeConverteris used in case of a
groupByoperation or a join operation; the converter method uses
mapAPIs from Spark to achieve that. In the case of a
groupByoperation, results are converted into the form
(key, Iterator(values)). In the case of a
COGROUPoperation, results are in the form
(index, key, value).
You can catch the finer details of migration plan in PIG-4059 or give Pig on Spark a go at our Github repo. We know it’s not perfect, so you can file issues here as well while we get Apache JIRA into shape.
I am happy to announce that we have passed 100% of end-to-end test cases on Pig, which means all your Pig code should run pretty smoothly already. When merged with the Pig repository, you will be able to get builds directly from the Pig website as well.
All this would not have been possible without the hard work from many organizations and people: Praveen R (Sigmoid Analytics), Akhil Das (Sigmoid Analytics), Kamal Banga (Sigmoid Analytics), Anish Haldiya (Sigmoid Analytics), Mayur Rustagi (Sigmoid Analytics), Amit Kumar Behera (Sigmoid Analytics), Mahesh Kalakoti (Sigmoid Analytics), Julien Le Dem (Twitter), Bill Graham (Twitter), Dmitriy Ryaboy (Twitter), Aniket Mokashi (Google), and Greg Owen (Databricks).
Finally, as we merge to Apache Pig, we are focusing on the following enhancements to further improve the speed on Pig:
- Cache Operator: Adding a new operator to explicitly hint Spark to cache certain datasets for faster execution
- Storage Hints: Allowing user to specify storage location of datasets in Spark for better control of memory
- YARN and Mesos Support: Adding resource manager support for more global deployment and support
Mayur Rustagi has four years of experience in building end-to-end architecture for big data applications. He is currently the CTO of Sigmoid Analytics, which is focused on Real Time Streaming & ETL solutions on Apache Spark.