How-to: Convert Existing Data into Parquet

Learn how to convert your data to the Parquet columnar format to get big performance gains.

Using a columnar storage format for your data offers significant performance advantages for a large subset of real-world queries. (Click here for a great introduction.)

Last year, Cloudera, in collaboration with Twitter and others, released a new Apache Hadoop-friendly, binary, columnar file format called Parquet. (Parquet was recently proposed for the ASF Incubator.) In this post, you will get an introduction to converting your existing data into Parquet format, both with and without Hadoop.

Implementation Details

The Parquet format is described here. However, it is unlikely that you’ll actually need this repository. Rather, the code you’ll need is the set of Hadoop connectors that you can find here.

The underlying implementation for writing data as Parquet requires a subclass of parquet.hadoop.api.WriteSupport that knows how to take an in-memory object and write Parquet primitives through parquet.io.api.RecordConsumer. Currently, there are several WriteSupport implementations, including ThriftWriteSupport, AvroWriteSupport, and ProtoWriteSupport, with more on the way.

These WriteSupport implementations are then wrapped as ParquetWriter objects or ParquetOutputFormat objects for writing as standalone programs or through the Hadoop MapReduce framework, respectively.

Next, I will demonstrate writing Avro objects to Parquet files.

Non-Hadoop (Standalone) Writer

Here is the basic outline for the program:

 

Alternatively, we could have just generated an AvroParquetWriter like so:

 

but the first version will generalize to other WriteSupports.

Hadoop MapReduce Writer

In this case you must use the ParquetOutputFormat classes. The driver for a MapReduce job that converts Avro to Parquet:

 

The mapper class is extremely simple:

 

You can find the code for the MapReduce Avro-to-Parquet converter here.

Notes on Compression

The Parquet specification allows you to specify a separate encoding/compression scheme for each column individually. However, this feature is not yet implemented on the write path. Currently, choosing a compression scheme will apply the same compression to each column (which should still be an improvement over row-major formats, since each column is still stored and compressed separately). As a general rule, we recommend Snappy compression as a good balance between size and CPU cost.

Notes on Block/Page Size

The Parquet specification allows you to specify block (row group) and page sizes. The page size refers to the amount of uncompressed data for a single column that is read before it is compressed as a unit and buffered in memory to be written out as a “page”. In principle, the larger the page size, the better the compression should be, though the 1MB default size already starts to achieve diminishing returns. The block size refers to the amount of compressed data that should be buffered in memory (comprising multiple pages from different columns) before a row group is written out to disk. Larger block sizes require more memory to buffer the data; the 128MB default size also shows good performance in our experience.

Impala prefers that Parquet files will contain a single row group (aka a “block”) in order to maximize the amount of data that is stored contiguously on disk. Separately, given a single row group per file, Impala prefers that the entire Parquet file will fit into an HDFS block, in order to avoid network I/O. To achieve that goal with MapReduce, each map must write only a single row group. Set the HDFS block size to a number that is greater than the size of the total Parquet output from a single input split — that is, if the HDFS block size is 128MB, and assuming no compression and rewriting the data doesn’t change the total size significantly, then the Parquet block size should be set slightly smaller 128MB. The only concern here is that the output for the entire input split must be buffered in memory before writing it to disk.

You should now have a good understanding of how to convert your data to Parquet format. The performance gains are substantial.

Uri Laserson (@laserson) is a data scientist at Cloudera. Questions/comments are welcome. Thanks to the Impala team, Tom White, and Julien Le Dem (Twitter) for help getting up-and-running with Parquet.

 

Filed under:

9 Responses
  • Daniel Gómez / May 19, 2014 / 7:54 AM

    Could you elaborate more on the block size? Why does Impala prefer just one row group per file?

    Thanks

    • Uri Laserson (@laserson) / May 20, 2014 / 5:03 PM

      Impala will read fastest when it can read contiguous data off of the disks. The Parquet format stores column groups contiguously on disk; breaking the file into multiple row groups will cause a single column to store data discontiguously. Therefore, to maximize the size of the column group, you want to have only a single row group. And based on the response to Jakub Kukul, you want to make sure that single row group fits into a single HDFS block, to avoid network I/O.

  • Jakub Kukul / May 19, 2014 / 3:08 PM

    Just double checking – which block size should be larger – HDFS block or parquet ‘block’? You’re saying that parquet ‘block’ should be larger than HDFS block (which makes sense to me), but it’s mentioned the opposite here:

    https://groups.google.com/d/msg/parquet-dev/NZU7FkFD3js/F7sy_xV-jWAJ

    • Uri Laserson (@laserson) / May 20, 2014 / 5:00 PM

      Whoops, it appears we wrote it backwards here (will edit shortly). The ideal is that the HDFS block size and the size of the Parquet file/row group are exactly the same, but this is obviously impossible to achieve. If the size of the Parquet file is larger than the HDFS block size, then reading the full file will require I/O over the network instead of local disk, which is slow. Therefore, you want to make the entire Parquet file fit into the HDFS block (so Parquet block size should be smaller than HDFS block size).

      Put another way, Impala wants to avoid network I/O whenever possible. But also see response to Daniel Gomez.

  • Paul / May 28, 2014 / 8:23 AM

    Hi,
    In the Impala docs it mentions using 1GB hdfs block size (http://www.cloudera.com/content/cloudera-content/cloudera-docs/Impala/latest/Installing-and-Using-Impala/ciiu_parquet.html)

    How does this relate to the 128MB mentioned in this blog post?
    Thanks

  • Sumanth Venkat / May 29, 2014 / 1:26 PM

    Hi Uri,

    Nice article. Could you give some pointers on Parquet taps for scalding? We are using scalding for most of our ETL jobs and we would like to emit files in ParquetFormat for further querying by Impala.

    Thanks,
    Sumanth

    • Uri Laserson (@laserson) / June 11, 2014 / 5:01 PM

      Hi Sumanth,

      I am not very familiar with Scalding/Cascading, unfortunately. I do know there is a parquet-cascading project which might be useful. Good luck!

      Uri

  • Neha / October 13, 2014 / 10:56 PM

    Hi,
    Thanks for this helpful article.
    I could convert my data into parquet but I see that along with the parquet file, corresponding .crc file is created too.
    How can I avoid the creation of this .crc file?

    Thanks,
    Neha

    • Uri Laserson (@laserson) / October 20, 2014 / 9:12 AM

      You probably need to play with some settings. See, for example, HADOOP-7178.

Leave a comment


1 + = seven