Over the last couple of months the Hive team at Cloudera has been working hard to bring a bunch of exciting new features to Apache Hive. In this blog post, I’m going to talk about one such feature – Column Statistics in Hive – and how Hive’s query processing engine can benefit from it. The feature is currently a work in progress but we expect it to be available for review imminently.

## Motivation

While there are many possible execution plans for a query, some plans are more optimal than others. The query optimizer is responsible for generating an efficient execution plan for a given SQL query from the space of all possible plans. Currently, Hive’s query optimizer uses rules of thumbs to generate an efficient execution plan for a query. While such rules of thumb optimizations transform the query plan into a more efficient one, the resulting plan is not always the most efficient execution plan.

In contrast, the query optimizer in a traditional RDBMS is cost based; it uses the statistical properties of the input column values to estimate the cost alternative query plans and chooses the plan with the lowest cost. The cost model for query plans assigns an estimated execution cost to the plans. The cost model is based on the CPU and I/O costs of query execution for every operator in the query plan. As an example consider a query that represents a join among {A, B, C} with the predicate {A.x == B.x == C.x}. Assume table A has a total of 500 records, table B has a total of 6000 records, table C has a total of 1000 records. In the absence of cost based query optimization, the system picks the join order specified by the user. In our example, let us further assume that the result of joining A and B yields 2000 records and the result of joining A and C yields 50 records.Hence the cost of performing the join between A, B and C, without join reordering, is the cost of joining A and B + cost of joining the output of A Join B with C. In our example this would result in a cost of (500 * 6000) + (2000 * 1000). On the other hand, a cost based optimizer (CBO) in a RDBMS would pick the more optimal alternate order [(A Join C) Join B] thus resulting in a cost of (500 * 1000) + (50 * 6000). However, in order to pick the more optimal join order the CBO needs cardinality estimates on the join column.

Today, Hive supports statistics at the table and partition level – count of files, raw data size, count of rows etc, but doesn’t support statistics on column values. These table and partition level statistics are insufficient for the purpose of building a CBO because they don’t provide any information about the individual column values. Hence obtaining the statistical summary of the column values is the first step towards building a CBO for Hive.

In addition to join reordering, Hive’s query optimizer will be able to take advantage of column statistics to decide whether to perform a map side aggregation as well as estimate the cardinality of operators in the execution plan better.

## New Statistics

The following statistics on columns – count of null values, count of true/false values, maximum value, minimum value, estimate of number of distinct values, average column length, maximum column length, and height balanced histograms – as they apply to the datatype of the underlying column have been added.

## Computing Statistics

Initially, we plan to support statistics gathering through an explicit ANALYZE command. We have extended the ANALYZE command to compute statistics on columns both within a table and a partition. The basic requirement to compute statistics is to stream the entire table data through mappers. Since some operations such as INSERT, CTAS already stream the table data through mappers, we could piggyback on such operations and compute statistics as part of these operations. We believe that by doing so, Hive may be able to compute statistics more efficiently than most RDBMSs which don’t combine data loading and computing statistics into a single operation.

Statistical summary of the column data can be logically viewed as an aggregation of the column values rolled up by either the table or the partition. Hence we have implemented statistics computation using the generic user defined aggregate function (UDAF) framework in Hive. UDAFs in Hive are executed in two stages; As the records are streamed through the mappers, a partial aggregate that is maintained in each mapper is updated. The reducers then combine the result of the partial aggregation to produce a final aggregate output.

Even though the stats task is run as a batch job , we want it to be executed as efficiently as possible. We expect to compute statistics on terabytes of data at a given time; hence we expect to scan billions of records. Therefore it is very important that the algorithms we use for computing statistics use constant memory. We have invested significant time researching algorithms for this task and have selected ones which we think provide a good tradeoff between accuracy and efficiency, and at the same time provide knobs for tuning. We have also attempted to structure the code in a pluggable fashion so that new algorithms can be easily substituted in the future.

## Storing Statistics

The statistics that are gathered through the ANALYZE command are persisted to avoid computing them every time the optimizer has to estimate the cost of a query plan.These newly added statistics are persisted in new tables that have been added to the metastore. We have also extended the metastore Thrift API to query the metastore for column statistics at the granularity of a partition as well as a table. Running the ANALYZE command on a column will overwrite any statistics that may already be present for the column.

## Algorithms for Computing Statistics

While computing some statistics such as average column length, minimum value, and maximum value is straightforward, computing some others statistics is not. For instance, computing the number of distinct column values accurately and efficiently is an interesting problem. While a deterministic counting algorithm will yield the exact number of distinct values in a column such an approach is not feasible for all data types. For instance, counting the number of distinct values for a boolean valued column is feasible, whereas the amount of memory required to count the number of distinct string values could be prohibitive especially when scanning terabytes of data. Instead a number of randomized counting algorithms over data streams have been proposed to estimate the number of distinct values efficiently while maintaining practical bounds on accuracy. We have implemented one such algorithm [1] proposed by Flajolet and Martin, hereafter referred to as FM algorithm.

The main idea behind the FM algorithm is to let each item in the data set select at random a bit in a bit vector (V) of length ( L) and set it to 1, with a geometric distribution. The selection of the bit is done using a hash function h that maps each value in the column uniformly at random to an integer in [0..2^L-1], and determining the largest b such that the b rightmost bits in h(v) are all 0.

The intuition behind the algorithm is as follows. Using a random hash function ensures that all elements with the same value will set the same bit. For each distinct value, bit (b) in vector (V) is set with probability 1/2^(b+1). Hence we expect bit V[b] to be set if there are at least 2^(b+1) distinct values. The algorithm finds a bit Z such that Z-1 is set whereas Z is not set. Hence there are likely greater than 2^(Z-1) but fewer than 2^Z distinct values.

While a single bit vector yields estimates that may be off by a factor of 2-4, 64 bit vectors and 64 different hash functions yield estimates that are within 10% of the true value. In our implementation we have used 64 bit vectors each of length 32 bits. Hence by using 256 bytes of memory – 32 * 64 bits – we are able to obtain estimates that are within 10% of the true value. Additionally the FM algorithm is simple to implement and nicely distributes to a cluster of nodes lending itself well to the mapreduce framework.

## Efficient Data Structures

As a test run, we attempted to compute statistics on a table containing 80+ columns and around 500 GB of data on a 10 node test cluster. We modeled the table after the fact table that is typical of a star schema in a data warehousing environment. After watching the statistics computation statement run for a day we realized that it would take another three days to compute. In search of the bottleneck we analyzed the code using Yourkit, an awesome Java profiler, and quickly determined that there were problems with Java’s built-in BitSet library. We swapped it out with Javolution’s FastBitSet library quickly and noticed a 10x improvement in performance. At this time, we are looking to replace some of the other built-in Java data structures with Javoloution’s equivalent and more efficient data structures.

## Wrapping Up

In this blog post, we presented the new statistics we have added to Hive, along with the details of the algorithms and data structures we have used to compute distinct values count. We also saw how Hive’s query processing engine can take advantage of these new statistics to process queries more efficiently.

The theme of a lot of the current work we are doing on Hive here at Cloudera is to make Hive adopt more the good characteristics of traditional RDBMSs ( i.e., ease of use, efficient utilization of hardware resources via a CBO, security, data management, JDBC/ODBC connectivity etc, while avoiding the pitfalls by building on top of a distributed computation framework that provides fault-tolerance, the ability to scale out linearly, and the opportunity to work with both structured and unstructured data.)

As I said before, we are working on a bunch of other Hive features as well, so watch out for a post from my colleague Carl Steinbach on other new features, particularly HiveServer2.

[1] Flajolet, P., Martin, N.G. Probabilistic Counting Algorithms for Data Base Applications

Cool! Is there a JIRA # for this?

Thanks Ashwin. HIVE-1362 tracks the column stats work.

Pingback: Hadoop Hive uusia ominaisuuksia « Olipa kerran Bigdata

Great post! The use of FM-sketch for probabilistically estimating the distinct vals in columns in interesting..

Thanks Prasanth!

Great post Shreepadma, I have a couple of questions.

One, how large are the column level statistics you are generating? Obviously it varies by the number of columns, but I wonder if you have an estimate per column. How will this affect the size requirements of the Hive metastore?

Two, do you have a guess at how much time this will add to loading data? I agree it would be good to auto-generate stats as the data is loaded. I was just wondering if it would be possible to gather the more complex statistics (such as distinct values and histograms) without impacting load times.

@Alan: We don’t expect this feature to bloat up the metastore. Column statistics is a summary of the column data and is hence several orders of magnitude smaller than the actual data. All of the statistics are of constant size and don’t depend on the size of the underlying data. For instance, for a long column, we store min, max, number of distinct values, number of null values and equi-depth histograms. If the user requested 20 histogram buckets, we would store a total of (8 + 8 + 8 + 8 + 8*2*20) bytes of data. Note that this number is independent of the underlying table/data size.

At this time, we don’t have accurate numbers to ascertain the impact auto stats gathering will have on data load time. Auto stats gathering will not be turned on by default and we can certainly add knobs to control the different statistics in a fine grained manner. We can add a config that will require histograms and ndvs to be explicitly turned on by the admin. By default it will be off.

As I understand, in the first step (Hive 0.10.0)only the support for gathering statistics will be provided. Any idea when the CBO will be implemented?