Apache Hive on Apache Spark: Motivations and Design Principles

Two of the most vibrant communities in the Apache Hadoop ecosystem are now working together to bring users a Hive-on-Spark option that combines the best elements of both.

Apache Hive is a popular SQL interface for batch processing and ETL using Apache Hadoop. Until recently, MapReduce was the only execution engine in the Hadoop ecosystem, and Hive queries could only run on MapReduce. But today, alternative execution engines to MapReduce are available — such as Apache Spark and Apache Tez (incubating).

Although Spark is relatively new to the Hadoop ecosystem, its adoption has been meteoric. An open-source data analytics cluster computing framework, Spark is built outside of Hadoop’s two-stage MapReduce paradigm but runs on top of HDFS. Because of its successful approach, Spark has quickly gained momentum and become established as an attractive choice for the future of data processing in Hadoop.

In this post, you’ll get an overview of the motivations and technical details behind some very exciting news for Spark and Hive users: the fact that the Hive and Spark communities are joining forces to collaboratively introduce Spark as a new execution engine option for Hive, alongside MapReduce and Tez (see HIVE-7292).

Motivation and Approach

Here are the two main motivations for enabling Hive to run on Spark:

  • To improve the Hive user experience
    Hive queries will run faster, thereby improving user experience. Furthermore, users will have access to a robust, non-MR execution engine that has already proven itself to be a leading option for data processing as well as streaming, and which is among the most active projects across all of Apache from contributor and commit standpoints.
  • To streamline operational management for Spark shops
    Hive-on-Spark will be very valuable for users who are already using Spark for other data-processing and machine-learning needs. Standardizing on one execution back end is also convenient for operational management, making it easier to debug issues and create enhancements.

Superficially, this project’s goals look similar to those of Shark or Spark SQL, which are separate projects that reuse the Hive front end to run queries using Spark. However, this design adds Spark into Hive, parallel to MapReduce and Tez, as another backend execution engine. Thus, existing Hive jobs continue to run as-is transparently on Spark.

The key advantage of this approach is to leverage all the existing integration on top of Hive, including ODBC/JDBC, auditing, authorization, and monitoring. Another advantage is that it will have no impact on Hive’s existing code path and thus no functional or performance effects. Users choosing to run Hive on either MapReduce or Tez will have the same functionality and code paths they have today — thus, the Hive user community will be in the great position of being able to choose among MapReduce, Tez, or Spark as a back end. In addition, maintenance costs will be minimized so the Hive community needn’t make specialized investments for Spark.

Meanwhile, users opting for Spark as the execution engine will automatically have all the rich functional features that Hive provides. Future features (such as new data types, UDFs, logical optimization, and so on ) added to Hive should become automatically available to those users, without any customization work to be done in Hive’s Spark execution engine.

Overall Functionality

To use Spark as an execution engine in Hive, you would set the following:

set hive.execution.engine=spark;

The default value for this configuration is still “mr”. Hive will continue to work on MapReduce as-is on clusters that don’t have Spark on them. When Spark is configured as Hive’s execution, a few configuration variables will be introduced, such as the master URL of the Spark cluster.

The new execution engine should support all Hive queries without any modification. Query results should be functionally equivalent to those from either MapReduce or Tez. In addition, existing functionality related to the execution engine should also be available, including the following:

  • Hive will display a task execution plan that’s similar to that being displayed by the explain command for MapReduce and Tez.     
  • Hive will give appropriate feedback to the user about progress and completion status of the query when running queries on Spark.
  • The user will be able to get statistics and diagnostic information as before (counters, logs, and debug info on the console).

Hive-Level Design

As noted previously, this project takes a different approach from that of Shark in that SQL semantics will be not implemented using Spark’s primitives, but rather MapReduce ones that will be executed in Spark.

The main work to implement the Spark execution engine for Hive has two components: query planning, where Hive operator plan from semantic analyzer is further translated a task plan that Spark can execute; and query execution, where the generated Spark plan is executed in the Spark cluster. There are other miscellaneous yet indispensable functional pieces involving monitoring, counters, statistics, and so on, but for brevity, we will only address the main design considerations here.

Query Planning

Currently, for a given user query, Hive’s semantic analyzer generates an operator plan that comprises a graph of logical operators such as TableScanOperator, ReduceSink, FileSink, GroupByOperator, and so on. MapReduceCompiler compiles a graph of MapReduceTasks and other helper tasks (such as MoveTask) from the logical operator plan. Tez behaves similarly, yet generates a TezTask that combines otherwise multiple MapReduce tasks into a single Tez task.

For Spark, we will introduce SparkCompiler parallel to MapReduceCompiler and TezCompiler. Its main responsibility is to compile from the Hive logical operator plan a plan that can be executed on Spark. Thus, we will have SparkTask, depicting a job that will be executed in a Spark cluster, and SparkWork, describing the plan of a Spark task. Thus, SparkCompiler translates a Hive’s operator plan into a SparkWork instance. During the task plan generation, SparkCompiler may also perform physical optimizations suitable for Spark.   

Job Execution

A SparkTask instance can be executed by Hive’s task execution framework in the same way as for other tasks. Internally, the SparkTask.execute() method will make RDDs and functions out of a SparkWork instance, and submit the execution to the Spark cluster via a Spark client.

Once the Spark work is submitted to the Spark cluster, the Spark client will continue to monitor the job execution and report progress. A Spark job can be monitored via SparkListener APIs.

With SparkListener APIs, we will add a SparkJobMonitor class that handles printing of status as well as reporting the final result. This class provides similar functions as HadoopJobExecHelper used for MapReduce processing, or TezJobMonitor used for Tez job processing, and will also retrieve and print the top-level exception thrown at execution time in case of job failure.

Spark job submission is done via a SparkContext object that’s instantiated with the user’s configuration. When a SparkTask is executed by Hive, such a context object is created in the current user session. With the context object, RDDs corresponding to Hive tables are created and MapFunction and ReduceFunction (more details below) are built from Hive’s SparkWork and applied to the RDDs. Job execution is triggered by applying a foreach() transformation on the RDDs with a dummy function.

Main Design Considerations

Hive’s operator plan is based on MapReduce paradigm, and traditionally, a query’s execution contains a list of MapReduce jobs. Each MapReduce job consists of map-side processing starting from Hive’s ExecMapper and reduce-side processing starting from ExecReducer, and MapReduce provides inherent shuffling, sorting, and grouping between the map-side and the reduce-side. The input to the whole processing pipeline are the folders and files corresponding to the table.

Because we will reuse Hive’s operator plan but perform the same data processing in Spark, the execution plan will be built in Spark constructs such as RDD, function, and transformation. This approach is outlined below.

Table as RDD

A Hive table is simply a bunch of files and folders on HDFS. Spark primitives are applied to RDDs. Thus, naturally, Hive tables will be treated as RDDs in the Spark execution engine.

MapFunction

The above mentioned MapFunction will be made from MapWork; specifically, the operator chain starting from ExecMapper.map() method. ExecMapper class implements MapReduce Mapper interface, but the implementation in Hive contains some code that can be reused for Spark. Therefore, we will extract the common code into a separate class, MapperDriver, to be shared by MapReduce as well as Spark.

ReduceFunction

Similarly, ReduceFunction will be made of ReduceWork instance from SparkWork. To Spark, ReduceFunction is no different than MapFunction, but the function’s implementation will be different, and made of the operator chain starting from ExecReducer.reduce(). Also, because some code in ExecReducer will be reused, we will extract the common code into a separate class, ReducerDriver, for sharing by both MapReduce and Spark.

Shuffle, Group, and Sort

While this functionality comes for “free” along with MapReduce, we will need to provide an equivalent for Spark. Fortunately, Spark provides a few transformations that are suitable for replacing MapReduce’s shuffle capability, such as partitionBy, groupByKey, and sortByKey. Transformation partitionBy does pure shuffling (no grouping or sorting), groupByKey does shuffling and grouping, and sortByKey() does shuffling plus sorting. Therefore, for each ReduceSinkOperator in SparkWork, we will need to inject one of the transformations.

Having the capability to selectively choose the exact shuffling behavior provides opportunities for optimization. For instance, Hive’s groupBy doesn’t require the key to be sorted, but MapReduce does. In contrast, in Spark, one can choose sortByKey only if key order is important (such as for SQL ORDER BY).

Multiple Reduce Stages

Whenever a query has multiple ReduceSinkOperator instances, Hive will break the plan apart and submit one MR job per sink. All the MR jobs in this chain need to be scheduled one-by-one, and each job has to re-read the output of the previous job from HDFS and shuffle it. In Spark, this step is unnecessary: multiple map functions and reduce functions can be concatenated. For each ReduceSinkOperator, a proper shuffle transformation needs to be injected as explained above.

Conclusion

Based on the above, you will likely recognize that although Hive on Spark is simple and clean in terms of functionality and design, the implementation will take some time. Therefore, the community will take a phased approach, with all basic functionality delivered in a Phase 1 and optimization and improvements ongoing over a longer period of time. (Precise number of phases and what each will entail are under discussion.)

Most important, the Hive and Spark communities will work together closely to achieve this technical vision and resolve any obstacles that might arise — with the end result being the availability to Hive users of an execution engine that improves performance as well as unifies batch and stream processing.

We invite you to follow our progress, on which we will offer periodic updates!

Xuefu Zhang is a Software Engineer at Cloudera and a Hive PMC member.

Filed under:

8 Responses
  • Ciaran / July 08, 2014 / 3:25 AM

    Just to confirm “Apache Hive is a popular SQL interface for batch processing and ETL using Apache Hadoop.”

    - SQL or SQL-like?

    Thanks

  • Diwakar / July 29, 2014 / 11:32 PM

    From my point of view we can also add below points as motivations for enabling Hive to run on Spark:

    1 – Spark user benefits: This feature is very valuable to users who are already using Spark for other data processing and machine learning needs. Standardizing on one execution backend is convenient for operational management, and makes it easier to develop expertise to debug issues and make enhancements.

    2 – Greater Hive adoption: Following the previous point, this brings Hive into the Spark user base as a SQL on Hadoop option, further increasing Hive’s adoption.

    3 – Performance: Hive queries, especially those involving multiple reducer stages, will run faster, thus improving user experience as Tez does.

    It is not a goal for the Spark execution backend to replace Tez or MapReduce. It is healthy for the Hive project for multiple backends to coexist. Users have a choice whether to use Tez, Spark or MapReduce. Each has different strengths depending on the use case. And the success of Hive does not completely depend on the success of either Tez or Spark.

  • Pavan / August 01, 2014 / 8:50 AM

    So,what happens to impala? Isn’t impala supposed to perform better than hive per cloudera’s previous benchmark testing? Why not integrate spark with impala? Does this mean impala is going to be doomed?

  • paul yang / October 15, 2014 / 2:53 AM

    What version of CDH will be included in Hive-on-Spark

  • James Yu / October 21, 2014 / 10:15 AM

    Hi, I really like the idea! When will it be available (in CDH), like 2015Q1 or 2015Q2? Thanks!

Leave a comment


6 + three =