Parquet at Salesforce.com

The following Parquet blog post was originally published by Salesforce.com Lead Engineer and Apache Pig Committer Prashant Kommireddi (@pRaShAnT1784). Prashant has kindly given us permission to re-publish below. Parquet is an open source columnar storage format co-founded by Twitter and Cloudera.

Parquet is a columnar storage format for Apache Hadoop that uses the concept of repetition/definition levels borrowed from Google Dremel. It provides efficient encoding and compression schemes, the efficiency being improved due to application of aforementioned on a per-column basis (compression is better as column values would all be the same type, encoding is better as values within a column could often be the same and repeated). Here is a nice blog post from Julien Le Dem of Twitter describing Parquet internals.

Parquet can be used by any project in the Hadoop ecosystem, there are integrations provided for MR, Pig, Hive, Cascading, and Cloudera Impala.

I am by no means an expert at this, and a lot of what I write here is based on my conversations with a couple of key contributors on the project (@J_ and @aniket486). Also, most of the content mentioned on this post is based on Pig+Parquet integration. We at Salesforce.com have started using Parquet for application logs processing with Pig and are encouraged with the preliminary performance results.

Writing a Parquet File

There is parquet.hadoop.ParquetWriter. You need to decide which ObjectModel you want to use. It could be Thrift, Avro, Pig, or the example model. Here is a function for writing a file using the Pig model (TupleWriteSupport):

 

“pigSchemaString” is the schema for the parquet file. This could be any valid pig schema, such as “a:int, b:int, c:int”. Note that I insert integer values in the tuple and hence schema fields are defined to be int.

So what exactly happened during the write? I use TupleWriteSupport which is a WriteSupport implementation that helps us write parquet files compatible with Pig. I then use ParquetWriter passing in a few arguments:

  • path – file path to write to

  • writeSupport – TupleWriteSupport in this case

  • compressionCodecName – could be UNCOMPRESSED, GZIP, SNAPPY, LZO

  • blockSize – block size which is 128M by default. Total size used by a block

  • pageSize –  from the parquet docs: “pages should be considered indivisible so smaller data pages allow for more fine grained reading (e.g. single row lookup). Larger page sizes incur less space overhead (less page headers) and potentially less parsing overhead (processing headers). Note: for sequential scans, it is not expected to read a page at a time; this is not the IO chunk.  We recommend 8KB for page sizes.” Default page size is 1MB

  • enableDictionary – turn on/off dictionary encoding

At the end, I create tuples with a few elements and write to the parquet file.

Reading a Parquet File

There is a bug due to which trying to read Parquet files written using the Pig model (TupleWriteSupport) directly with ParquetReader fails. See https://github.com/Parquet/parquet-mr/issues/195

However, you can use Pig to read the file.

 

Initial Findings

I ran the PerfTest and noticed it takes longer to read 1 column and takes progressively less for more columns.

 

That’s the JVM warmup phase!

The JIT compiler will inline methods based on how often they are called. So it waits before doing so to see what gets called often. Julien suggested I run it twice in a row (in the same process) and compare the times. This is generic and nothing in particular to Parquet, but I wanted to highlight in case you happen to run into similar perf numbers.

Voila! The 2nd iteration did provide better results.

 

I ran another test – reading a regular text file vs parquet file. The file contained 20M rows and 6 columns. The results didn’t seem right, both storage and processing-wise. Was I missing something?

 

By default compression is not enabled, which is why the Parquet file is larger (footers, headers, summary files take up additional space. Note Parquet stores information regarding each page, column chunk, file to be able to determine the exact pages that need to be loaded by a query. You can find additional info here).

Also if you are reading all the columns, it is expected that the columnar format will be slower as row storage is more efficient when you read all the columns. Project fewer columns and you should find a difference. You should see a projection pushdown message in the logs.

Yes you need bigger files to get benefits from the columnar storage!

At this point, I wanted to try out encoding and see how that plays out on the overall storage. My next question – Are different encoding (RLE, dictionary) to be provided by the client, or does Parquet figure out the right one to use based on the data? Turns out Parquet will use the dictionary encoding if it can but right now you need to turn that on:

 

Finally did get some nice results after enabling dictionary encoding and filtering on a single column. It was a lot better storage wise too once Dictionary Encoding was enabled. (The following was run on a larger dataset)

 

Here are the numbers from 2nd iteration – just to negate the effects of JVM warmup, and to be fair to text-row format :)

 

Schema Management

Parquet can handle multiple schemas. This is important for our use-case at SFDC for log processing. We have several different types of logs, each with its own schema, and we have a few hundred of them. Most pig queries run against a few log types. Parquet merges schema and provides the ability to parse out columns from different files.

  • LogType A : organizationId, userId, timestamp, recordId, cpuTime
  • LogType V : userId, organizationId, timestamp, foo, bar

A query that tries to parse the organizationId and userId from the 2 logTypes should be able to do so correctly, though they are positioned differently in the schema. With Parquet, it’s not a problem. It will merge ‘A’ and ‘V’ schemas and project columns accordingly. It does so by maintaining a file schema in addition to merged schema and parsing the columns by referencing the 2.

Projection Pushdown

One of the advantages of a columnar format is the fact that it can read only those parts from a file that are necessary. The columns not required are never read, avoiding unnecessary and expensive I/O.

For doing this in Pig, just pass in the required schema in to the constructor of ParquetLoader.

 

The above query loads columns ‘a’ and ‘b’ only. When you do so, you should find a message similar to the following in logs

Assembled and processed 4194181 records from 2 columns in 2124 ms: 1974.6615 rec/ms, 1974.6615 cell/ms

If you hadn’t done that, a file containing 16 columns would all be loaded

Assembled and processed 4194181 records from 16 columns in 6313 ms: 664.3721 rec/ms, 10629.953 cell/ms

Summary Files

Parquet generates a summary file for all part files generated under a directory (job output). The summary file reduces the number of calls to the namenode and individual slaves while producing the splits which reduces the latency to start a job significantly. Otherwise it will have to open the footer of every part file which occasionally is slowed down by the namenode or a bad slave that we happen to hit. Reading one summary file reduces the risks to hit a slow slave and the load on the namenode.

For example, if the output directory to which Parquet files are written by a Pig script is ‘/user/username/foo’.

 

This will create part files under ‘foo’, the number of these part files depends on the number of reducers.

 

The summary file is generated when the hadoop job writing the files is finished as it is in the outputCommitter of the output format (ParquetOutputCommitter.commitJob). It reads all footers in parallel and creates the summary file so all subsequent “LOAD” or reads on the directory ‘foo’ could be more efficient.

There is one summary file for all the part files output by the same job. That is, one per directory containing multiple part files.

Hadoop Compatibility

Anyone who has been a part of a major hadoop upgrade should be familiar with how painful the process can be. At SFDC, we moved from a really old version 0.20.2 to 2.x (recently declared GA). This involved upgrading a ton of dependencies, making client side changes to use the newer APIs, bunch of new configurations, and eliminating a whole lot of deprecated stuff. Though this was a major upgrade and most upgrades here on should be smooth(er), it always helps if dependent and 3rd party libraries don’t need to be recompiled.

With Parquet, you should not need to re-compile for hadoop 2. It hides all the hadoop 2 incompatibilities behind reflective calls so the same jars will work.

And Finally…

We at Salesforce.com have been early adopters of several big data open source technologies. Hadoop, Pig, HBase, Kafka, Zookeeper, Oozie to name a few either have been or are in the process of making it to production. Phoenix, a SQL layer on top of HBase, is a project that was homegrown and is now open-sourced. Parquet is the latest addition, and we are looking forward to using it for more datasets (Oracle exports for example) in the near future and not just application logs. The Parquet community is helpful, open to new ideas and contributions, which is great for any open source project.

Thanks, Prashant!

Filed under:

No Responses

Leave a comment


+ 1 = five