Two of the most vibrant communities in the Apache Hadoop ecosystem are now working together to bring users a Hive-on-Spark option that combines the best elements of both.
(Editor’s note [April 12, 2016]: Hive-on-Spark is now GA/ready for production as of CDH 5.7.)
Apache Hive is a popular SQL interface for batch processing and ETL using Apache Hadoop. Until recently, MapReduce was the only execution engine in the Hadoop ecosystem, and Hive queries could only run on MapReduce. But today, alternative execution engines to MapReduce are available — such as Apache Spark and Apache Tez (incubating).
Although Spark is relatively new to the Hadoop ecosystem, its adoption has been meteoric. An open-source data analytics cluster computing framework, Spark is built outside of Hadoop’s two-stage MapReduce paradigm but runs on top of HDFS. Because of its successful approach, Spark has quickly gained momentum and become established as an attractive choice for the future of data processing in Hadoop.
In this post, you’ll get an overview of the motivations and technical details behind some very exciting news for Spark and Hive users: the fact that the Hive and Spark communities are joining forces to collaboratively introduce Spark as a new execution engine option for Hive, alongside MapReduce and Tez (see HIVE-7292).
Motivation and Approach
Here are the two main motivations for enabling Hive to run on Spark:
- To improve the Hive user experience
Hive queries will run faster, thereby improving user experience. Furthermore, users will have access to a robust, non-MR execution engine that has already proven itself to be a leading option for data processing as well as streaming, and which is among the most active projects across all of Apache from contributor and commit standpoints.
- To streamline operational management for Spark shops
Hive-on-Spark will be very valuable for users who are already using Spark for other data-processing and machine-learning needs. Standardizing on one execution back end is also convenient for operational management, making it easier to debug issues and create enhancements.
Superficially, this project’s goals look similar to those of Shark or Spark SQL, which are separate projects that reuse the Hive front end to run queries using Spark. However, this design adds Spark into Hive, parallel to MapReduce and Tez, as another backend execution engine. Thus, existing Hive jobs continue to run as-is transparently on Spark.
The key advantage of this approach is to leverage all the existing integration on top of Hive, including ODBC/JDBC, auditing, authorization, and monitoring. Another advantage is that it will have no impact on Hive’s existing code path and thus no functional or performance effects. Users choosing to run Hive on either MapReduce or Tez will have the same functionality and code paths they have today — thus, the Hive user community will be in the great position of being able to choose among MapReduce, Tez, or Spark as a back end. In addition, maintenance costs will be minimized so the Hive community needn’t make specialized investments for Spark.
Meanwhile, users opting for Spark as the execution engine will automatically have all the rich functional features that Hive provides. Future features (such as new data types, UDFs, logical optimization, and so on ) added to Hive should become automatically available to those users, without any customization work to be done in Hive’s Spark execution engine.
To use Spark as an execution engine in Hive, you would set the following:
The default value for this configuration is still “
mr”. Hive will continue to work on MapReduce as-is on clusters that don’t have Spark on them. When Spark is configured as Hive’s execution, a few configuration variables will be introduced, such as the master URL of the Spark cluster.
The new execution engine should support all Hive queries without any modification. Query results should be functionally equivalent to those from either MapReduce or Tez. In addition, existing functionality related to the execution engine should also be available, including the following:
- Hive will display a task execution plan that’s similar to that being displayed by the
explaincommand for MapReduce and Tez.
- Hive will give appropriate feedback to the user about progress and completion status of the query when running queries on Spark.
- The user will be able to get statistics and diagnostic information as before (counters, logs, and debug info on the console).
As noted previously, this project takes a different approach from that of Shark in that SQL semantics will be not implemented using Spark’s primitives, but rather MapReduce ones that will be executed in Spark.
The main work to implement the Spark execution engine for Hive has two components: query planning, where Hive operator plan from semantic analyzer is further translated a task plan that Spark can execute; and query execution, where the generated Spark plan is executed in the Spark cluster. There are other miscellaneous yet indispensable functional pieces involving monitoring, counters, statistics, and so on, but for brevity, we will only address the main design considerations here.
Currently, for a given user query, Hive’s semantic analyzer generates an operator plan that comprises a graph of logical operators such as
GroupByOperator, and so on.
MapReduceCompiler compiles a graph of
MapReduceTasks and other helper tasks (such as
MoveTask) from the logical operator plan. Tez behaves similarly, yet generates a
TezTask that combines otherwise multiple MapReduce tasks into a single Tez task.
For Spark, we will introduce
SparkCompiler parallel to
TezCompiler. Its main responsibility is to compile from the Hive logical operator plan a plan that can be executed on Spark. Thus, we will have
SparkTask, depicting a job that will be executed in a Spark cluster, and
SparkWork, describing the plan of a Spark task. Thus,
SparkCompiler translates a Hive’s operator plan into a
SparkWork instance. During the task plan generation,
SparkCompiler may also perform physical optimizations suitable for Spark.
SparkTask instance can be executed by Hive’s task execution framework in the same way as for other tasks. Internally, the
SparkTask.execute() method will make RDDs and functions out of a
SparkWork instance, and submit the execution to the Spark cluster via a Spark client.
Once the Spark work is submitted to the Spark cluster, the Spark client will continue to monitor the job execution and report progress. A Spark job can be monitored via
SparkListener APIs, we will add a
SparkJobMonitor class that handles printing of status as well as reporting the final result. This class provides similar functions as
HadoopJobExecHelper used for MapReduce processing, or
TezJobMonitor used for Tez job processing, and will also retrieve and print the top-level exception thrown at execution time in case of job failure.
Spark job submission is done via a
SparkContext object that’s instantiated with the user’s configuration. When a
SparkTask is executed by Hive, such a context object is created in the current user session. With the context object, RDDs corresponding to Hive tables are created and
ReduceFunction (more details below) are built from Hive’s
SparkWork and applied to the RDDs. Job execution is triggered by applying a
foreach() transformation on the RDDs with a dummy function.
Main Design Considerations
Hive’s operator plan is based on MapReduce paradigm, and traditionally, a query’s execution contains a list of MapReduce jobs. Each MapReduce job consists of map-side processing starting from Hive’s
ExecMapper and reduce-side processing starting from
ExecReducer, and MapReduce provides inherent shuffling, sorting, and grouping between the map-side and the reduce-side. The input to the whole processing pipeline are the folders and files corresponding to the table.
Because we will reuse Hive’s operator plan but perform the same data processing in Spark, the execution plan will be built in Spark constructs such as RDD, function, and transformation. This approach is outlined below.
Table as RDD
A Hive table is simply a bunch of files and folders on HDFS. Spark primitives are applied to RDDs. Thus, naturally, Hive tables will be treated as RDDs in the Spark execution engine.
The above mentioned
MapFunction will be made from
MapWork; specifically, the operator chain starting from
ExecMapper class implements MapReduce Mapper interface, but the implementation in Hive contains some code that can be reused for Spark. Therefore, we will extract the common code into a separate class,
MapperDriver, to be shared by MapReduce as well as Spark.
ReduceFunction will be made of
ReduceWork instance from
SparkWork. To Spark,
ReduceFunction is no different than
MapFunction, but the function’s implementation will be different, and made of the operator chain starting from
ExecReducer.reduce(). Also, because some code in
ExecReducer will be reused, we will extract the common code into a separate class,
ReducerDriver, for sharing by both MapReduce and Spark.
Shuffle, Group, and Sort
While this functionality comes for “free” along with MapReduce, we will need to provide an equivalent for Spark. Fortunately, Spark provides a few transformations that are suitable for replacing MapReduce’s shuffle capability, such as
partitionBy does pure shuffling (no grouping or sorting),
groupByKey does shuffling and grouping, and
sortByKey() does shuffling plus sorting. Therefore, for each
SparkWork, we will need to inject one of the transformations.
Having the capability to selectively choose the exact shuffling behavior provides opportunities for optimization. For instance, Hive’s
groupBy doesn’t require the key to be sorted, but MapReduce does. In contrast, in Spark, one can choose
sortByKey only if key order is important (such as for SQL
Multiple Reduce Stages
Whenever a query has multiple
ReduceSinkOperator instances, Hive will break the plan apart and submit one MR job per sink. All the MR jobs in this chain need to be scheduled one-by-one, and each job has to re-read the output of the previous job from HDFS and shuffle it. In Spark, this step is unnecessary: multiple map functions and reduce functions can be concatenated. For each
ReduceSinkOperator, a proper shuffle transformation needs to be injected as explained above.
Based on the above, you will likely recognize that although Hive on Spark is simple and clean in terms of functionality and design, the implementation will take some time. Therefore, the community will take a phased approach, with all basic functionality delivered in a Phase 1 and optimization and improvements ongoing over a longer period of time. (Precise number of phases and what each will entail are under discussion.)
Most important, the Hive and Spark communities will work together closely to achieve this technical vision and resolve any obstacles that might arise — with the end result being the availability to Hive users of an execution engine that improves performance as well as unifies batch and stream processing.
We invite you to follow our progress, on which we will offer periodic updates!
Xuefu Zhang is a Software Engineer at Cloudera and a Hive PMC member.