How-to: Process Time-Series Data Using Apache Crunch

Did you know that using the Crunch API is a powerful option for doing time-series analysis?

Apache Crunch is a Java library for building data pipelines on top of Apache Hadoop. (The Crunch project was originally founded by Cloudera data scientist Josh Wills.) Developers can spend more time focused on their use case by using the Crunch API to handle common tasks such as joining data sets and chaining jobs together in a pipeline. At Cloudera, we are so enthusiastic about Crunch that we have included it in CDH 5! (You can get started with Apache Crunch here and here.)

Furthermore, Crunch is a really good option for transforming and analyzing time-series data. In this post, I will provide a simple example for bootstrapping with Crunch for that use case.

Using the Power of Avro + Crunch

Crunch provides an API that makes it easy to sort, group, iterate, and apply custom processing logic to time-series data.

You can explore this API with an example Crunch job. The sample code below loads Apache Avro files containing records of stock trades, adds a trade sequence number for each trade of each stock symbol, and writes the appended records back to new Avro files. While this is only a simple calculation, you can use the same structure to form much more involved time-series routines. (The code for this example can be found online here.)

First, specify the schema you will use to process the Avro data. This does not need to exactly match the schemas in the Avro files — compatible differences are handled by Avro schema resolution. In fact, here you would specify the optional sequence number field that does not exist in the input data, but that will be populated by the Crunch job to be included in the output files. This is a glimpse into the power of schema evolution when using Avro.


Next, reference the trade Avro files on HDFS as a PCollection of Avro generic records with the above schema.


From the collection of all trades for the day, group the trades by stock symbol so that you can sequence the trades of each stock. If the trades for each stock symbol are provided already in time order, you do not need to sort them manually. Crunch includes the SecondarySort class to do just that, provided you first structure the data so that it knows which field to group by and which field to sort by.


You can now use SecondarySort to provide the trades for each stock symbol in time order. Use the Crunch DoFn class to iterate through the sorted trades applying the incrementing sequence number as you go. The process method is called once per stock symbol. The result is a PCollection of Avro records with the same schema as the input data, but in this case the sequence number field is populated with values.


With the processing complete, you can simply write the resulting Avro records to HDFS. Alternatively, the sequencedTrades object could be used as an input to subsequent stages of the same Crunch pipeline.



As you have seen here, Crunch can be a powerful tool for processing time-series data. To develop this same example with the MapReduce API would require considerably more boilerplate code to sort the values and handle Avro data types. A simple example like this could also be possible using SQL windowing functions; however, more complex logic or data types are often beyond the capabilities of SQL.

We at Cloudera encourage data pipeline developers to consider Crunch for their data processing routines, especially for time-series transformation and analysis.

Jeremy Beard is a Solutions Architect at Cloudera.

Filed under:

No Responses

Leave a comment

seven − 1 =