Using Impala at Scale at Allstate

Our thanks to Don Drake (@dondrake), an independent technology consultant who is currently working as a Principal Big Data Consultant at Allstate Insurance, for the guest post below about his experiences with Impala.

It started with a simple request from one of the managers in my group at Allstate to put together a demo of Tableau connecting to Cloudera Impala. I had previously worked on Impala with a large dataset about a year ago while it was still in beta, and was curious to see how Impala had improved since then in features and stability.

In this post, I’ll share my experiences with Impala during the process of building my demo. As you’ll see below, I learned that unlike traditional computing environments that can’t effectively process the full spectrum of your records — thereby limiting the power of your data – with Impala, you do not have to narrow that spectrum!

Generating a Dataset

I immediately began to brainstorm a good dataset, thinking it should be based on internal data that is large enough to stress Impala and our CDH cluster. 

I have been working on a project that calculates earned premiums across a large dataset as a Hadoop Streaming job. This job provides ad hoc reporting with hundreds of possible dimensions to choose from as well as a handful of facts. My thought was to take the output of this job using all dimensions and all facts, as that would yield a dataset that is very wide as well as rich in rows. I chose numerous years of input for my Streaming job and generated a dataset with more than 800 columns and approximately 1.1 billion rows stored in HDFS as an uncompressed CSV file. This file was 2.3TB in size.  

This output represented a countrywide dataset that spans numerous years, and is a magnitude or two larger of what is normally processed by our user community. Once loaded into Impala, this would be a great dataset to showcase query performance as well as Tableau reporting.

Loading the Dataset

After you have a dataset sitting in a directory in HDFS, the fastest way to start using it in Impala (or even Hive) is through External Tables. Basically, you create a table with the EXTERNAL keyword, supply column names and data types as usual, and then reference the table location in HDFS. This is a schema-on-read scenario, so although the create table might succeed, you will still have to query and inspect the data before you realize you made a mistake creating the external table.

The following is a subset of my create table DDL and executed by pasting into impala-shell:

 

(Note: For aggregation/demo purposes, I have defined only facts, not dimensions, as decimals in the schema here.)

All queries were run on a 35-node Hadoop cluster running CDH 4.6 and Impala 1.2.4. The cluster was lightly loaded running MapReduce jobs while I ran the following queries. (With each SQL statement, I will also show the EXPLAIN PLAN output, as this will provide some insight into how Impala is constructing the execution strategy for the query.) After I created a table referencing our data in the HDFS directory /user/drake/cdf_impala, I checked to see how many rows are available:

 

So, I had a 1.1 billion row dataset and it took just under three minutes for Impala to tell me that. The explain plan shows that it had to scan 2.28TB of data.

I consider that response time to be quite fast, but “fast” is also a relative term. For comparison, the exact same Hive query took eight minutes to return a row count. 

Next, I ran a more difficult query for Impala: a distinct count of geographic areas by year:

 

From the explain plan, you can see Impala had to scan the entire dataset before it could aggregate. The query time came down a little bit after the row-count query, probably because of disk caching on the data nodes.

Then, I tweaked the query to see only one year’s of results. As you can see below, if you do limit the query for a single year, the explain plan shows it must still scan the entire dataset:

 

This looks pretty good. I got aggregates on the entire dataset as well as a subset of it in minutes!

But wait: One can improve these queries by utilizing some features available to Impala – specifically, partitions and Parquet.

Optimization using Partitions and Parquet

Data partitioning is not unique to Impala; it is available in just about every RDBMS and provides performance by physically separating data based on one or more column values. For example, if normal query patterns reveal that most queries on a table only look at a single year (as shown in our previous example), and if you partition your table by year, the query planner can easily ignore other years’ worth of data not needed by your query. That happens to be the case for our dataset, and we will partition our dataset by these known columns.

Parquet support in Impala is where Impala really shines. Parquet is an open source serialization format that stores data in a binary column-oriented fashion. Instead of how row-oriented data is stored, where every column for a row is stored together and then followed by the next row (again with columns stored next to each other), Parquet turns things on its head. Instead, Parquet will take a group of records and store the values of the first column together for the entire row group, followed by the values of the second column, and so on. Parquet has optimizations for scanning individual columns, so it doesn’t have to read the entire row group if you are only interested in a subset of columns.

When you have data with low cardinality, as we have in our example with our ACTYR column, we have millions of rows of data with the value "111". Parquet can utilize old-school compression strategies like run length encoding (RLE) and dictionary encoding, as well as modern data compression codecs like Snappy. This approach allows the data to be compressed at a very high rate, which will reduce I/O times and further reduce query times.

Let’s put these two features to use. Lucky for us, it is really easy for us to add support for both partitioning with by adding a few lines of DDL to our create table. Our strategy is to create a new table that has partitions defined with Parquet enabled and then load our existing data into this new table.

 

Our DDL for this new table has two additional statements. The first tells Impala to partition the data on four columns: the accounting year, geographic area, line of business, and company values. Impala will handle ensuring data is put into the correct partition and utilize these partitions for optimal queries.

The other additional statement says to store the data in the Parquet serialization format. By default, Impala will use Snappy compression.

With a few additional statements our new table is now partitioned, optimized for Parquet, and compressed. As you can see, using Parquet and Impala is pretty easy.

Reload

Now that our new table is ready to go, let’s populate it.

 

The INSERT syntax specifies the PARTITION-ed columns. We do not specify any values in the INSERT statement as we are utilizing dynamic partitioning. This allows the data to be partitioned automatically based on the values from the SELECT statement. One trick to note is that the partition column names must appear at the end of the list of columns in the SELECT statement.

This simple INSERT statement is doing a lot underneath the covers: partitioning the data on disk and creating the Parquet-based and Snappy compressed data files. It takes six hours for this INSERT statement to complete. 

In the next section, we will see if this extra processing is worth it.

Time to Query

I’ll re-run the previous queries against the newly partitioned table as well as run an EXPLAIN plan before running the query to see what the optimizer is doing.

 

As you can see, we have a dramatic improvement over the original Impala table. The explain plan shows that there are now 787 partitions in this table and that the total size of the table is 106GB, down from 2.28TB.

The first time I saw these numbers I thought for sure I made a mistake in creating the table – could it possibly shrink by 2.28TB to 106GB? To prove to myself that I did have a billion rows represented in this dataset, I ran counts on distinct policy numbers to verify it. Only then was I confident that it could be used for running further queries.

Let’s see how our query on geographic regions performs on the new table.

 

The explain plan shows a full-table scan again (but this time on 106GB of data) and now the query returns in under 10 seconds. We went from calculating aggregates in minutes to seconds!

Now we run the GROUP BY query and filter on a single year again:

 

Here you can see that the optimizer used 198 of the 787 partitions and only had to read 24GB of data. This query on the other table took just over two minutes to complete, compared to just under two seconds on our new table.

Conclusion

With some assistance from a colleague I was able to do a live demo of Tableau against the partitioned table with reports presented in seconds. The demo was so fast, in fact, that I had to remind the group a few times that we were analyzing a billion-row dataset with a few clicks.

From the above, you should now realize that running queries against an Impala data store is fast and scales as your cluster scales. Furthermore, by including partitioning and Parquet, dramatic improvements in query time are possible with minimal effort.

Marrying our most powerful source of data with this technology can put critical information quickly in the hands of our decision makers. Our data source is very complex; it requires specialized skills to stage, merge, and summarize data into actionable information. However, with Impala, we can make objective, customer-focused decisions by providing trusted actionable information to decision makers at record speed, and also allow them to hand-select which information is important to them. 

Now, I’m looking for a larger dataset!

Don Drake (@dondrake) is an independent technology consultant in the Chicago area. He can be reached at don@drakeconsulting.com.

3 Responses
  • Wes Floyd / May 18, 2014 / 1:50 PM

    Would you run the same test on Hive 0.13 with ORC and Tez and post the results?

  • Vik / May 19, 2014 / 5:58 PM

    @Wes great question, that would be an ideal comparison. Eagerly awaiting the results!

Leave a comment


3 − = one