How-to: Prepare Unstructured Data in Impala for Analysis

Categories: How-to Impala

Learn how to build an Impala table around data that comes from non-Impala, or even non-SQL, sources.

As data pipelines start to include more aspects such as NoSQL or loosely specified schemas, you might encounter situations where you have data files (particularly in Apache Parquet format) where you do not know the precise table definition. This tutorial shows how you can build an Impala table around data that comes from non-Impala or even non-SQL sources, where you do not have control of the table layout and might not be familiar with the characteristics of the data.

The data used in this tutorial represents airline on-time arrival statistics, from October 1987 through April 2008. (See the details on the 2009 ASA Data Expo web site.). You can also see the explanations of the columns; for purposes of this exercise, wait until after following the tutorial before examining the schema, to better simulate a real-life situation where you cannot rely on assumptions and assertions about the ranges and representations of data values.

Wrangling the Data

First, we download and unpack the data files. There are eight files totalling 1.4GB. Each file is less than 256MB in size.

Next, we put the Parquet data files in HDFS, all together in a single directory, with permissions on the directory and the files so that the impala user will be able to read them. (Note: After unpacking, we saw the largest Parquet file was 253MB. When copying Parquet files into HDFS for Impala to use, for maximum query performance, make sure that each file resides in a single HDFS data block. Therefore, we pick a size larger than any single file and specify that as the block size, using the argument -Ddfs.block.size=256m on the hdfs dfs -put command.)

With the files in an accessible location in HDFS, we create a database table that uses the data in those files. The CREATE EXTERNAL syntax and the LOCATION attribute point Impala at the appropriate HDFS directory. The LIKE PARQUET 'path_to_any_parquet_file' clause means we skip the list of column names and types; Impala automatically gets the column names and data types straight from the data files. (Currently, this technique only works for Parquet files.) We ignore the warning about lack of READ_WRITE access to the files in HDFS; the impala user can read the files, which will be sufficient for us to experiment with queries and perform some copy and transform operations into other tables.

Confirming the Data

With the table created, we examine its physical and logical characteristics to confirm that the data is really there and in a format and shape that we can work with. The SHOW TABLE STATS statement gives a very high-level summary of the table, showing how many files and how much total data it contains. Also, it confirms that the table is expecting all the associated data files to be in Parquet format. (The ability to work with all kinds of HDFS data files in different formats means that it is possible to have a mismatch between the format of the data files, and the format that the table expects the data files to be in.)

The SHOW FILES statement confirms that the data in the table has the expected number, names, and sizes of the original Parquet files. The DESCRIBE statement (or its abbreviation DESC) confirms the names and types of the columns that Impala automatically created after reading that metadata from the Parquet file. The DESCRIBE FORMATTED statement prints out some extra detail along with the column definitions; the pieces we care about for this exercise are the containing database for the table, the location of the associated data files in HDFS, the fact that it’s an external table so Impala will not delete the HDFS files when we finish the experiments and drop the table, and the fact that the table is set up to work exclusively with files in the Parquet format.

Running Queries

Now that we are confident that the connections are solid between the Impala table and the underlying Parquet files, we run some initial queries to understand the characteristics of the data: the overall number of rows, and the ranges and how many different values are in certain columns. For convenience in understanding the magnitude of the COUNT(*) result, we run another query dividing the number of rows by 1 million, demonstrating that there are 123 million rows in the table.

The NDV() function stands for number of distinct values, which for performance reasons is an estimate when there are lots of different values in the column, but is precise when the cardinality is less than 16KB. Use NDV() calls for this kind of exploration rather than COUNT(DISTINCT colname), because Impala can evaluate multiple NDV() functions in a single query, but only a single instance of COUNT DISTINCT.

Here we see that there are modest numbers of different airlines, flight numbers, and origin and destination airports. Two things jump out from this query: the number of tail_num values is much smaller than we might have expected, and there are more destination airports than origin airports. Let’s dig further.

What we find is that most tail_num values are NULL. It looks like this was an experimental column that wasn’t filled in accurately. We make a mental note that if we use this data as a starting point, we’ll ignore this column. We also find that certain airports are represented in the ORIGIN column but not the DEST column; now we know that we can’t rely on the assumption that those sets of airport codes are identical.

(Note: A slight digression for some performance tuning. Notice how the first SELECT DISTINCT DEST query takes almost 40 seconds. We expect all queries on such a small data set, less than 2GB, to take a few seconds at most. The reason is because the expression NOT IN (SELECT origin FROM airlines_external) produces an intermediate result set of 123 million rows, then runs 123 million comparisons on each data node against the tiny set of destination airports. The way the NOT IN operator works internally means that this intermediate result set with 123 million rows might be transmitted across the network to each data node in the cluster. Applying another DISTINCT inside the NOT IN subquery means that the intermediate result set is only 340 items, resulting in much less network traffic and fewer comparison operations. The more efficient query with the added DISTINCT is approximately seven times as fast.)

Initial Exploration

Next, we try doing a simple calculation, with results broken down by year. This reveals that some years have no data in the AIRTIME column. That means we might be able to use that column in queries involving certain date ranges, but we can’t count on it to always be reliable. The question of whether a column contains any NULL values, and if so what is their number, proportion, and distribution, comes up again and again when doing initial exploration of a data set.

With the notion of NULL values in mind, let’s come back to the TAILNUM column that we discovered had a lot of NULLs. Let’s quantify the NULL and non-NULL values in that column for better understanding.

First, we just count the overall number of rows versus the non-NULL values in that column. That initial result gives the appearance of relatively few non-NULL values, but we can break it down more clearly in a single query. Once we have the COUNT(*) and the COUNT(colname) numbers, we can encode that initial query in a WITH clause, then run a followon query that performs multiple arithmetic operations on those values. Seeing that only one-third of one percent of all rows have non-NULL values for the TAILNUM column clearly illustrates that that column won’t be of much use.

By examining other columns using these techniques, we can form a mental picture of the way data is distributed throughout the table, and which columns are most significant for query purposes. For this tutorial, we focus mostly on the fields likely to hold discrete values, rather than columns such as ACTUAL_ELAPSED_TIME whose names suggest they hold measurements. We would dig deeper into those columns once we had a clear picture of which questions were worthwhile to ask, and what kinds of trends we might look for.

For the final piece of initial exploration, let’s look at the YEAR column. A simple GROUP BY query shows that it has a well-defined range, a manageable number of distinct values, and relatively even distribution of rows across the different years.

We could go quite far with the data in this initial raw format, just as we downloaded it from the web. If the data set proved to be useful and worth persisting in Impala for extensive queries, we might want to copy it to an internal table, letting Impala manage the data files and perhaps reorganizing a little for higher efficiency.

Copying the Data into a Partitioned Table

Partitioning based on the YEAR column lets us run queries with clauses such as WHERE year = 2001 or WHERE year BETWEEN 1989 AND 1999, which can dramatically cut down on I/O by ignoring all the data from years outside the desired range. Rather than reading all the data and then deciding which rows are in the matching years, Impala can zero in on only the data files from specific YEAR partitions. To do this, Impala physically reorganizes the data files, putting the rows from each year into data files in a separate HDFS directory for each YEAR value. Along the way, we’ll also get rid of the TAIL_NUM column that proved to be almost entirely NULL.

The first step is to create a new table with a layout very similar to the original AIRLINES_EXTERNAL table. We’ll do that by reverse-engineering a CREATE TABLE statement for the first table, then tweaking it slightly to include a PARTITION BY clause for YEAR, and excluding the TAIL_NUM column. The SHOW CREATE TABLE statement gives us the starting point.

Although we could edit that output into a new SQL statement, all the ASCII box characters make such editing inconvenient. To get a more stripped-down CREATE TABLE to start with, we restart the impala-shell command with the -B option, which turns off the box-drawing behavior.

After copying and pasting the CREATE TABLE statement into a text editor for fine-tuning, we quit and restart impala-shell without the -B option, to switch back to regular output.

Next we run the CREATE TABLE statement that we adapted from the SHOW CREATE TABLE output. We kept the STORED AS PARQUET clause because we want to rearrange the data somewhat but still keep it in the high-performance Parquet format. The LOCATION and TBLPROPERTIES clauses are not relevant for this new table, so we edit those out. Because we are going to partition the new table based on the YEAR column, we move that column name (and its type) into a new PARTITIONED BY clause.

Next, we copy all the rows from the original table into this new one with an INSERT statement. (We edited the CREATE TABLE statement to make an INSERT statement with the column names in the same order.) The only change is to add a PARTITION(year) clause, and move the YEAR column to the very end of the SELECT list of the INSERT statement. Specifying PARTITION(year), rather than a fixed value such as PARTITION(year=2000), means that Impala figures out the partition value for each row based on the value of the very last column in the SELECT list.

This is the first SQL statement that legitimately takes any substantial time, because the rows from different years are shuffled around the cluster; the rows that go into each partition are collected on one node, before being written to one or more new data files.

Once partitioning or join queries come into play, it’s important to have statistics that Impala can use to optimize queries on the corresponding tables. The COMPUTE INCREMENTAL STATS statement is the way to collect statistics for partitioned tables. Then the SHOW TABLE STATS statement confirms that the statistics are in place for each partition, and also illustrates how many files and how much raw data is in each partition.

At this point, we go through a quick thought process to sanity check the partitioning we did. All the partitions have exactly one file, which is on the low side. A query that includes a clause WHERE year=2004 will only read a single data block; that data block will be read and processed by a single data node; therefore, for a query targeting a single year, all the other nodes in the cluster will sit idle while all the work happens on a single machine. It’s even possible that by chance (depending on HDFS replication factor and the way data blocks are distributed across the cluster), that multiple year partitions selected by a filter such as WHERE year BETWEEN 1999 AND 2001 could all be read and processed by the same data node. The more data files each partition has, the more parallelism you can get and the less probability of “hotspots” occurring on particular nodes, therefore a bigger performance boost by having a big CDH cluster.

However, the more data files, the less data goes in each one. The overhead of dividing the work in a parallel query might not be worth it if each node is only reading a few megabytes. 50 or 100MB is a decent size for a Parquet data block; 9 or 37MB is on the small side. Which is to say, the data distribution we ended up with based on this partitioning scheme is on the borderline between sensible (reasonably large files) and suboptimal (few files in each partition). The way to see how well it works in practice is to run the same queries against the original flat table and the new partitioned table, and compare times.

Spoiler: in this case, with my particular four-node cluster with its specific distribution of data blocks and my particular exploratory queries, queries against the partitioned table do consistently run faster than the same queries against the unpartitioned table. But I could not be sure that would be the case without some real measurements. Here are some queries I ran to draw that conclusion, first against AIRLINES_EXTERNAL (no partitioning), then against AIRLINES (partitioned by year). The AIRLINES queries are consistently faster. Changing the volume of data, changing the size of the cluster, running queries that did or didn’t refer to the partition key columns, or other factors could change the results to favor one table layout or the other.

(Note: If you find the volume of each partition is only in the low tens of megabytes, consider lowering the granularity of partitioning. For example, instead of partitioning by year, month, and day, partition by year and month or even just by year. The ideal layout to distribute work efficiently in a parallel query is many tens or even hundreds of megabytes per Parquet file, and the number of Parquet files in each partition somewhat higher than the number of data nodes.)

Diving into Real Analysis

Now we can finally do some serious analysis with this data set that, remember, a few minutes ago all we had were some raw data files and we didn’t even know what columns they contained. Let’s see whether the “air time” of a flight tends to be different depending on the day of the week. We can see that the average is a little higher on day number 6; perhaps Saturday is a busy flying day and planes have to circle for longer at the destination airport before landing.

To see if the apparent trend holds up over time, let’s do the same breakdown by day of week, but also split up by year. Now we can see that day number 6 consistently has a higher average air time in each year. We can also see that the average air time increased over time across the board. And the presence of NULL for this column in years 1987 to 1994 shows that queries involving this column need to be restricted to a date range of 1995 and higher.

Conclusion

I hope this use case has provided a good example of how to prepare data originating from outside Impala (and even non-SQL data) for analysis. Crunch away!

John Russell is the technical writer of the Impala docs. He is also the author of the O’Reilly Media book, Getting Started with Impala.

Facebooktwittergoogle_pluslinkedinmailFacebooktwittergoogle_pluslinkedinmail