Pig is Flying: Apache Pig on Apache Spark

Our thanks to Mayur Rustagi (@mayur_rustagi), CTO at Sigmoid Analytics, for allowing us to re-publish his post about the Spork (Pig-on-Spark) project below. (Related: Read about the ongoing upstream to bring Spark-based data processing to Hive here.)

Analysts can talk about data insights all day (and night), but the reality is that 70% of all data analyst time goes into data processing and not analysis. At Sigmoid Analytics, we want to streamline this data processing pipeline so that analysts can truly focus on value generation and not data preparation.

We focus our efforts on three simple initiatives:

  • Make data processing more powerful
  • Make data processing more simple
  • Make data processing 100x faster than before

As a data mashing platform, the first key initiative is to combine the power and simplicity of Apache Pig on Apache Spark, making existing ETL pipelines 100x faster than before. We do that via a unique mix of our operator toolkit, called DataDoctor, and Spark.

DataDoctor is a high-level operator DSL on top of Spark. It has frameworks for no-symmetrical joins, sorting, grouping, and embedding native Spark functions. It hides a lot of complexity and makes it simple to implement data operators used in applications like Pig and Apache Hive on Spark.

For the uninitiated, Spark is open source Big Data infrastructure that enables distributed fault-tolerant in-memory computation. As the kernel for the distributed computation, it empowers developers to write testable, readable, and powerful Big Data applications in a number of languages including Python, Java, and Scala.

How Can I Get Started?

As a user of Apache Pig, the migration effort starts and ends with

 

All your existing UDF, Pig scripts, and data loaders will work out of the box on Spark — which means you can write simpler, easier-to-develop-and-manage data pipelines on Spark. The Pig REPL is a simple way to speed up your data processing on Spark without any coding, compiling, or development effort. What’s more, you have thousands of Pig UDFs to choose from and bootstrap your ETL process on Spark.

High-Level Design

Pig operates in a similar manner to Big Data applications like Hive and Cascading. It has a query language quite akin to SQL that allows analysts and developers to design and write data flow. The query language is translated into a “logical plan” that is is further translated into a “physical plan” containing operators. Those operators are then run on the designated execution engine (MapReduce, Apache Tez, and now Spark). There are a whole bunch of details around tracking progress, handling errors, and so on that I will skip here.

Query Planning

Query planning on Spark will vary significantly from MapReduce, as Spark handles data wrangling in a much more optimized way. Further query planning can benefit greatly from ongoing effort on Catalyst inside Spark. At this moment, we have simply introduced a SparkPlanner that will undertake the conversion from logical to physical plan for Pig. Databricks is working actively to enable Catalyst to handle much of the operator optimizations that will plug into SparkPlanner in the near future. Longer term, we plan to rely on Spark itself for logical plan generation. An early version of this integration has been prototyped in partnership with Databricks.

Pig Launcher

Pig Core hands off Spark execution to SparkLauncher with the physical plan. SparkLauncher creates a SparkContext providing all the Pig dependency jars and Pig itself.

SparkLauncher gets a MR plan object created from the physical plan. At this point, we override all the Pig operators to DataDoctor operators recursively in the whole plan. Two iterations are performed over the plan — one which looks at the store operations and recursively travels down the execution tree, and a second iteration that does a breadth-first traversal over the plan and calls convert on each of the operators.

The base class of convertors in DataDoctor is POConverter class and defines the abstract method convert, which is called during plan execution.

Interesting Operators

  • LoadOperator: An RDD is created for the data that can be used for subsequent transformations. LoadConverter helps load data from HDFS using the Spark API with parameters initialized from POLoad operator.
  • StoreOperator: This operator is useful for saving the end results or some intermediate data whenever required. StoreConverter is used to save data to HDFS with parameters from POStore operator.
  • Local rearrange: LocalRearrangeConverter directly passes data to POLocalRearrangeConverter, which in turn transforms data into the required format. This happens through the Spark map API. The local rearrange operator is a part of the COGROUP implementation. It has an embedded physical plan that generates tuples of the form (grpKey,(indxed inp Tuple))
  • Global rearrange: GlobalRearrangeConverter is used in case of a groupBy operation or a join operation; the converter method uses groupBy and map APIs from Spark to achieve that. In the case of a groupBy operation, results are converted into the form (key, Iterator(values)). In the case of a COGROUP operation, results are in the form (index, key, value).

You can catch the finer details of migration plan in PIG-4059 or give Pig on Spark a go at our Github repo. We know it’s not perfect, so you can file issues here as well while we get Apache JIRA into shape.

Status

I am happy to announce that we have passed 100% of end-to-end test cases on Pig, which means all your Pig code should run pretty smoothly already. When merged with the Pig repository, you will be able to get builds directly from the Pig website as well.

All this would not have been possible without the hard work from many organizations and people: Praveen R (Sigmoid Analytics), Akhil Das (Sigmoid Analytics), Kamal Banga (Sigmoid Analytics), Anish Haldiya (Sigmoid Analytics), Mayur Rustagi (Sigmoid Analytics), Amit Kumar Behera (Sigmoid Analytics), Mahesh Kalakoti (Sigmoid Analytics), Julien Le Dem (Twitter),  Bill Graham (Twitter), Dmitriy Ryaboy (Twitter), Aniket Mokashi (Google), and Greg Owen (Databricks).

Future Plans

Finally, as we merge to Apache Pig, we are focusing on the following enhancements to further improve the speed on Pig:

  • Cache Operator: Adding a new operator to explicitly hint Spark to cache certain datasets for faster execution
  • Storage Hints: Allowing user to specify storage location of datasets in Spark for better control of memory
  • YARN and Mesos Support: Adding resource manager support for more global deployment and support

Mayur Rustagi has four years of experience in building end-to-end architecture for big data applications. He is currently the CTO of Sigmoid Analytics, which is focused on Real Time Streaming & ETL solutions on Apache Spark.

 

Filed under:

3 Responses
  • furqonuddin ramdhani / October 02, 2014 / 8:50 PM

    looks interesting, is it will available on CDH also ?

  • Stratos / November 14, 2014 / 7:29 PM

    You are mentioning last about Mesos/ Yarn support. Why exactly is this needed? If Pig is configured to run either with Spark or Hadoop and both of them are compiled to run on top of Mesos then the Pig queries will not be MR or Spark jobs that will eventually run with Mesos/Yarn? Should the cluster scheduler no matter if this is Mesos or Yarn be aware of Pig as a framework that runs on top of them?

Leave a comment


nine − 6 =