Using Apache Hadoop and Impala with MySQL for Data Analysis

Thanks to Alexander Rubin of Percona for allowing us to re-publish the post below!

Apache Hadoop is commonly used for data analysis. It is fast for data loads and scalable. In a previous post I showed how to integrate MySQL with Hadoop. In this post I will show how to export a table from  MySQL to Hadoop, load the data to Cloudera Impala (columnar format), and run reporting on top of that. For the examples below, I will use the “ontime flight performance” data from my previous post.

I’ve used Cloudera Manager to install Hadoop and Impala. For this test I’ve (intentionally) used an old hardware (servers from 2006) to show that Hadoop can utilize the old hardware and still scale. The test cluster consists of 6 datanodes. Below are the specs:

Purpose Server specs
Namenode, Hive metastore, etc + Datanodes 2x PowerEdge 2950, 2x L5335 CPU @ 2.00GHz, 8 cores, 16GB RAM, RAID 10 with 8 SAS drives
Datanodes only 4x PowerEdge SC1425, 2x Xeon CPU @ 3.00GHz, 2 cores, 8GB RAM, single 4TB drive

 

As you can see those a pretty old servers; the only thing I’ve changed is added a 4TB drive to be able to store more data. Hadoop provides redundancy on the server level (it writes 3 copies of the same block to all datanodes) so we do not need RAID on the datanodes (need redundancy for namenodes thou).

Data Export

There are a couple of ways to export data from MySQL to Hadoop. For the purpose of this test I have simply exported the ontime table into a text file with:

select * into outfile '/tmp/ontime.psv'
FIELDS TERMINATED BY ','
from ontime;

 

(You can use “|” or any other symbol as a delimiter.) Alternatively, you can download data directly from www.transtats.bts.gov using this simple script:

for y in {1988..2013}
do
        for i in {1..12}
        do
                u="http://www.transtats.bts.gov/Download/On_Time_On_Time_Performance_${y}_${i}.zip"
                wget $u -o ontime.log
                unzip On_Time_On_Time_Performance_${y}_${i}.zip
        done
done

 

Load into Hadoop HDFS

First thing we will need to do is to load data into HDFS as a set of files. Hive or Impala it will work with a directory to which you have imported your data and concatenate all files inside this directory. In our case it is easy to simply copy all our files into the directory inside HDFS.

$ hdfs dfs -mkdir /data/ontime/
$ hdfs -v dfs -copyFromLocal On_Time_On_Time_Performance_*.csv /data/ontime/

 

Create External Table in Impala

Now, when we have all data files loaded we can create an external table:

CREATE EXTERNAL TABLE ontime_csv (
YearD int ,
Quarter tinyint ,
MonthD tinyint ,
DayofMonth tinyint ,
DayOfWeek tinyint ,
FlightDate string ,
UniqueCarrier string ,
AirlineID int ,
Carrier string ,
TailNum string ,
FlightNum string ,
OriginAirportID int ,
OriginAirportSeqID int ,
OriginCityMarketID int ,
Origin string ,
OriginCityName string ,
OriginState string ,
OriginStateFips string ,
OriginStateName string ,
OriginWac int ,
DestAirportID int ,
DestAirportSeqID int ,
DestCityMarketID int ,
Dest string ,
...
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION '/data/ontime';

 

Note the “EXTERNAL” keyword and LOCATION (LOCATION points to a directory inside HDFS, not a file). Impala will create a meta information only (will not modify the table). We can query this table right away, however, Impala will need to scan all files (full scan) for queries.

Example:

[d30.local:21000] > select yeard, count(*) from ontime_psv  group by yeard;
Query: select yeard, count(*) from ontime_psv  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
| 2002  | 5271359  |
| 2004  | 7129270  |
| 1997  | 5411843  |
| 2012  | 6096762  |
| 2005  | 7140596  |
| 1999  | 5527884  |
| 2007  | 7455458  |
| 1994  | 5180048  |
| 2008  | 7009726  |
| 1988  | 5202096  |
| 2003  | 6488540  |
| 1996  | 5351983  |
| 1989  | 5041200  |
| 2011  | 6085281  |
| 1998  | 5384721  |
| 1991  | 5076925  |
| 2006  | 7141922  |
| 1993  | 5070501  |
| 2001  | 5967780  |
| 1995  | 5327435  |
| 1990  | 5270893  |
| 1992  | 5092157  |
| 2000  | 5683047  |
+-------+----------+
Returned 26 row(s) in 131.38s

 

(Note that GROUP BY will not sort the rows, unlike MySQL. To sort we will need to add ORDER BY yeard.)

Explain plan:

Query: explain select yeard, count(*) from ontime_csv  group by yeard
+-----------------------------------------------------------+
| Explain String                                            |
+-----------------------------------------------------------+
| PLAN FRAGMENT 0                                           |
|   PARTITION: UNPARTITIONED                                |
|                                                           |
|   4:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 1                                           |
|   PARTITION: HASH_PARTITIONED: yeard                      |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 4                                        |
|     UNPARTITIONED                                         |
|                                                           |
|   3:AGGREGATE (merge finalize)                            |
|   |  output: SUM(COUNT(*))                                |
|   |  group by: yeard                                      |
|   |                                                       |
|   2:EXCHANGE                                              |
|                                                           |
| PLAN FRAGMENT 2                                           |
|   PARTITION: RANDOM                                       |
|                                                           |
|   STREAM DATA SINK                                        |
|     EXCHANGE ID: 2                                        |
|     HASH_PARTITIONED: yeard                               |
|                                                           |
|   1:AGGREGATE                                             |
|   |  output: COUNT(*)                                     |
|   |  group by: yeard                                      |
|   |                                                       |
|   0:SCAN HDFS                                             |
|      table=ontime.ontime_csv #partitions=1/1 size=45.68GB |
+-----------------------------------------------------------+
Returned 31 row(s) in 0.13s

 

As we can see it will scan 45GB of data.

Impala with Columnar Format and Compression

The great benefit of the Impala is that it supports columnar format and compression. I’ve tried the new Parquet format with Snappy compression codec. As our table is very wide (and de-normalized) it will help alot to use columnar format. To take advantages of the Parquet format we will need to load data into it, which is easy to do when we already have a table inside impala and files inside HDFS:

[d30.local:21000] > set PARQUET_COMPRESSION_CODEC=snappy;
[d30.local:21000] > create table ontime_parquet_snappy LIKE ontime_parquet_snappy STORED AS PARQUET;
[d30.local:21000] > insert into ontime_parquet_snappy select * from ontime_csv;
Query: insert into ontime_parquet_snappy select * from ontime_csv
Inserted 152657276 rows in 729.76s

 

Then we can test our query against the new table:

Query: explain select yeard, count(*) from ontime_parquet_snappy  group by yeard
+---------------------------------------------------------------------+
| Explain String                                                      |
+---------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                     |
|   PARTITION: UNPARTITIONED                                          |
|                                                                     |
|   4:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 1                                                     |
|   PARTITION: HASH_PARTITIONED: yeard                                |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 4                                                  |
|     UNPARTITIONED                                                   |
|                                                                     |
|   3:AGGREGATE (merge finalize)                                      |
|   |  output: SUM(COUNT(*))                                          |
|   |  group by: yeard                                                |
|   |                                                                 |
|   2:EXCHANGE                                                        |
|                                                                     |
| PLAN FRAGMENT 2                                                     |
|   PARTITION: RANDOM                                                 |
|                                                                     |
|   STREAM DATA SINK                                                  |
|     EXCHANGE ID: 2                                                  |
|     HASH_PARTITIONED: yeard                                         |
|                                                                     |
|   1:AGGREGATE                                                       |
|   |  output: COUNT(*)                                               |
|   |  group by: yeard                                                |
|   |                                                                 |
|   0:SCAN HDFS                                                       |
|      table=ontime.ontime_parquet_snappy #partitions=1/1 size=3.95GB |
+---------------------------------------------------------------------+
Returned 31 row(s) in 0.02s

 

As we can see it will scan much smaller amount of data: 3.95GB (with compression) compared to 45GB.

Results:

Query: select yeard, count(*) from ontime_parquet_snappy  group by yeard
+-------+----------+
| yeard | count(*) |
+-------+----------+
| 2010  | 6450117  |
| 2013  | 5349447  |
| 2009  | 6450285  |
...
Returned 26 row(s) in 4.17s

 

And the response time is much better as well.

Impala Complex Query Example

I’ve used the complex query from my previous post. I had to adapt it for use with Impala: it does not support sum(ArrDelayMinutes>30) notation but sum(if(ArrDelayMinutes>30, 1, 0) works fine.

select
   min(yeard), max(yeard), Carrier, count(*) as cnt,
   sum(if(ArrDelayMinutes>30, 1, 0)) as flights_delayed,
   round(sum(if(ArrDelayMinutes>30, 1, 0))/count(*),2) as rate
FROM ontime_parquet_snappy
WHERE
   DayOfWeek not in (6,7) and OriginState not in ('AK', 'HI', 'PR', 'VI')
   and DestState not in ('AK', 'HI', 'PR', 'VI')
   and flightdate < '2010-01-01'
GROUP by carrier
HAVING cnt > 100000 and max(yeard) > 1990
ORDER by rate DESC
LIMIT 1000;

 

The query is intentionally designed the way it does not take advantage of the indexes: most of the conditions will only filter out less than 30% of the data.

Impala results:

+------------+------------+---------+----------+-----------------+------+
| min(yeard) | max(yeard) | carrier | cnt      | flights_delayed | rate |
+------------+------------+---------+----------+-----------------+------+
| 2003       | 2009       | EV      | 1454777  | 237698          | 0.16 |
| 2003       | 2009       | FL      | 1082489  | 158748          | 0.15 |
| 2006       | 2009       | XE      | 1016010  | 152431          | 0.15 |
| 2003       | 2009       | B6      | 683874   | 103677          | 0.15 |
| 2006       | 2009       | YV      | 740608   | 110389          | 0.15 |
| 2003       | 2005       | DH      | 501056   | 69833           | 0.14 |
| 2001       | 2009       | MQ      | 3238137  | 448037          | 0.14 |
| 2004       | 2009       | OH      | 1195868  | 160071          | 0.13 |
| 2003       | 2006       | RU      | 1007248  | 126733          | 0.13 |
| 2003       | 2006       | TZ      | 136735   | 16496           | 0.12 |
| 1988       | 2009       | UA      | 9593284  | 1197053         | 0.12 |
| 1988       | 2009       | AA      | 10600509 | 1185343         | 0.11 |
| 1988       | 2001       | TW      | 2659963  | 280741          | 0.11 |
| 1988       | 2009       | CO      | 6029149  | 673863          | 0.11 |
| 2007       | 2009       | 9E      | 577244   | 59440           | 0.10 |
| 1988       | 2009       | US      | 10276941 | 991016          | 0.10 |
| 2003       | 2009       | OO      | 2654259  | 257069          | 0.10 |
| 1988       | 2009       | NW      | 7601727  | 725460          | 0.10 |
| 1988       | 2009       | DL      | 11869471 | 1156267         | 0.10 |
| 1988       | 2009       | AS      | 1506003  | 146920          | 0.10 |
| 1988       | 2005       | HP      | 2607603  | 235675          | 0.09 |
| 2005       | 2009       | F9      | 307569   | 28679           | 0.09 |
| 1988       | 1991       | PA      | 206841   | 19465           | 0.09 |
| 1988       | 2009       | WN      | 12722174 | 1107840         | 0.09 |
+------------+------------+---------+----------+-----------------+------+
Returned 24 row(s) in 15.28s

 

15.28 seconds is significantly faster than original MySQL results (15 min 56.40 sec without parallel execution and 5 min 47 with the parallel execution). However, this is not an “apples to apples comparison”:

  • MySQL will scan 45GB of data and Impala with Parquet will only scan 3.5GB.
  • MySQL will run on a single server, Hadoop + Impala will run in parallel on 6 servers.

Nevertheless, Hadoop + Impala shows impressive performance and ability to scale out the box, which can help a lot with the large data volume analysis.

Conclusion

Hadoop + Impala will give us an easy way to analyze large datasets using SQL with the ability to scale even on old hardware.

Alexander joined Percona in 2013. Alexander worked with MySQL since 2000 as DBA and Application Developer. Before joining Percona he was doing MySQL consulting as a principal consultant for over 7 years (started with MySQL AB in 2006, then Sun Microsystems and then Oracle).

Filed under:

No Responses

Leave a comment


9 − = two