How-to: Build Advanced Time-Series Pipelines in Apache Crunch

Categories: Graph Processing How-to

Learn how creating dataflow pipelines for time-series analysis is a lot easier with Apache Crunch.

In a previous blog post, I described a data-driven market study based on Wikipedia access data and content. I explained how useful it is to combine several public data sources, and how this approach sheds light onto the hidden correlations across Wikipedia pages.

One major task in the above was to apply structural analysis to networks reconstructed by time-series analysis techniques. In this post, I will describe a different method: the use of Apache Crunch time-series processing pipelines for large-scale correlation and dependency analysis. The results of these operations will be a matrix or network representation of the underlying data set, which can be further processed in an Apache Hadoop (CDH) cluster via GraphLab, GraphX, or even Apache Giraph.

This article assumes that you know a bit about Crunch. If not, read the Crunch user guide first. Furthermore, this short how-to explains how to extract and re-organize data that is already stored in Apache Avro files, using a Crunch pipeline. All source code for the article is available in the crunch.TS project.

Initial Situation and Goal

In our example dataset, for each measurement period, one SequenceFile was generated. Such a file is called a “time-series bucket” and contains a key-value pair of types: Text (from Hadoop) and VectorWritable (from Apache Mahout). We use data types of projects, which do not guarantee stability over time, and we are dependent on Java as a programming language because others cannot read SequenceFiles.

The dependency on external libraries, such as the VectorWritable class from Mahout, should be removed from our data representation and storage layer, so it is a good idea to store the data in an Avro file. Such files can also be organized in a directory hierarchy that fits to the concept of Apache Hive partitions. Data processing will be done in Crunch, but for fast delivery of pre-calculated results, Impala will be used.

A more general approach will be possible later on if we use Kite SDK and Apache HCatalog, as well. In order to achieve interoperability between multiple analysis tools or frameworks — and I think this is a crucial aspect in data management, even in the case of an enterprise data hub — you have to think about access patterns early.

Preparing the Data

In general, two types of input data formats are possible for time series. Digital measurement procedures, applied to properties of a system that can be described as continuous functions, lead to equidistant time series with a well-known, usually constant, sampling rate. Data obtained from human interaction, such as social network activity or even email communication — is not of such a type. Rather, such data can be seen as an ordered collection of time-stamped values. The higher the event density or the shorter the recurrence intervals, the more event time series can be transformed into equidistant time series (and vice-versa).

Figure 1: Example of (a) continuous time series and (b) event time series. (See full presentation here. Results of the fluctuation analysis and return interval statistics applied to the Wikipedia click statistics data set were published here.)

There are two obvious ways to create time-series records, pairs, and tuples:

  • Direct measurement by external devices and data delivery via Flume, maybe into HBase, or into HDFS as plain text files, SequenceFiles, or Avro files. (See “Part 1” in Figure 2.)
  • Using data aggregation procedures such as sessionization or reorganization of data sets (more on that later). Joins and up to two cross-joins are required to group time series for multivariate analysis. (See “Part 2” in Figure 2.)

Figure 2: Time-series bucket creation and bucket analysis (univariate or multivariate) leads to network representation of the underlying system.

(Re)using a POJO as Custom Data Type for Time Series

For fast, easy data serialization, use the reflects method of the Avros class. If the POJO has a parameter-less constructor and all fields have an Avro equivalent (find more details on Avro types in the Avro documentation), you do not need an additional wrapper class.


Listing 1: A POJO is used to store continuous equidistant time series data.

In other cases, such as if complex data types or nested objects have to be serialized, a custom PType class is required and can be generated by the Avros.records()method.

Custom PTypes for Event Time Series

The second type of time series consists of events, in which a set of timestamped values is stored together with some metadata about the event. We use a string, which is interpreted as a unique identifier or a de-referenceable URI that points to a web location, which provides more background information about an individual event. (The latter apprach follows the W3C recommendation for building a linked data web that is based on work from Tim Berners-Lee.)

As a start, time-series conversion will be the core functionality; we’ll add the metadata lookup feature later.

We start with the following Avro schema stored in file eventts.avsc:


Listing 2: Avro schema evolution allows you to add more fields while the data maintains compatibility.

An interesting and helpful exercise is to examine the Avro’s schema evolution. Listing 2 shows the original schema, with recently added fields highlighted. Such changes do not affect data accessibility like it would if data were stored in SequenceFiles.

Using the Eclipse Maven plugin, Avro schema files are processed and the two Java classes EventTSContainer and Event are generated in the source folder. Those classes are now used to map the EventTS into an Avro record.


Listing 3: The class EventTS uses fields that do not have an Avro equivalent, so we have to implement a bidirectional mapping between the generated Avro record and the POJO.


Listing 4: ETL steps of a time-series processing pipeline


Listing 5: A POJO-like EventTS with complex data types is mapped to an Avro record, EventTSRecord, which was generated from an Avro schema (Listing 2) to return it as a result of a DoFn.

Now we want to discuss the runtime behavior of the Crunch pipeline, which we just created using the CrunchTool class instead of extending Configured and implementing Tool from the Hadoop API. Doing that allows us to call some convenient methods for reading and writing files.

Listing 6 shows the core of our CrunchTool subclass, which applied some simple aggregation functions to the data set.


Listing 6: A Crunch pipeline: MapReduce in four acts

This code triggers four individual MapReduce jobs. Let’s investigate the output:


(The last job writes the Avro file back to disk without any comments.)

It looks like we have to read the same input four times during four different jobs. However, although each groupByKey operation unavoidably kicks off its own MR job, some of them – such as max() and min() — can be merged as shown below:


Listing 7: Optimization of a Crunch pipeline is done with custom code: a pair aggregator reduces the number of jobs which have to be executed — max() and min() can now be calculated in one turn.

This issue is not specific to the MapReduce pipeline. Even in Spark, four passes would be required (unless custom-coded merging is done).

Multivariate Time-Series Analysis

After these initial steps of data conversion and shaping of raw data, another important preparation step is required: MapReduce procedures have to “share nothing.” That means, while one record is processed, in general, no data about other records is available.

For correlation analysis, we need all possible pairs of records or time series. Therefore, we have to create new records, which are time-series pairs — or in dependency analysis, even time-series triples.

Forming such pairs and triples is easy, but the heaviest operation, a cross-join, has to be executed twice in the case of triples. Such intermediate data sets are relatively large, so it is important to find out how often each time-series bucket will be processed. If fewer than three times, a materialization of the records in HDFS might not be required, and one can do the processing steps that extract new information or calculate new data from the pairs or triples directly within this pipeline.

The result will be a small data set that only contains pair IDs or even the element IDs together with a set of values. This structure looks like an edge list and can directly be visualized as a graph via Gephi; even graph processing can be done using GraphLab or Giraph on your Hadoop cluster. (See my previous post for an explanation.)

The analysis pipeline has to load the data set from a SequenceFile, and then we apply the cross-join twice. As we do not need to store all intermediate pairs, only two MapReduce jobs are required. During the first job, cross-correlation analysis is done via the CCA class and the dependency networks are created in the second job using the DEP class. Both classes extend DoFn from the Crunch API and use existing code from Hadoop.TS.


Listing 8: A Crunch-based tool to create time-series pairs and triples for correlation and dependency analysis.

Output is as follows:


Now we can focus on more advanced functionality and real time-series analysis (or even network analysis, which starts with reconstructed networks).


Implementations of advanced time-series algorithms require flexible and reliable data structures that can be converted from format into others and that support fundamental consistency checks, like comparison of ranges and sampling rates. This helps you avoid unnecessary workloads in Hadoop clusters, and supports the quality and reliability of processing procedures. For now, Avro is an efficient and flexible storage format for Crunch applications, with Apache Parquet (currently incubating) being a valid alternative — especially since native Parquet support is also available for Hive.

You should now appreciate how easily time-series data can be converted between multiple formats, and how useful Crunch pipelines are for dataflow management. This way, advanced analytics is even independent from a specific Hadoop processing layer, which can be either traditional MapReduce framework, the Spark engine, or in case of algorithmic prototyping, even a local workstation if a Crunch in memory pipeline is used.

Have fun and happy Crunch-ing!

Mirko Kämpf is the lead instructor for the Cloudera Administrator Training for Apache Hadoop for Cloudera University.