**Time-series analysis is becoming mainstream across multiple data-rich industries. The new spark-ts library helps analysts and data scientists focus on business questions, not on building their own algorithms.**

Have you ever wanted to build models over measurements coming in every second from sensors across the world? Dig into intra-day trading prices of millions of financial instruments? Compare hourly view statistics across every page on Wikipedia? To do any of these things, you’d need to do a large sequence of measurements over time.

Large-scale time-series data shows up across a variety of domains. In this post, I’ll introduce Time Series for Spark (distributed as the `spark-ts`

package), a library developed by Cloudera’s Data Science team (and in use by customers) that enables analysis of data sets comprising millions of time series, each with millions of measurements. The package runs atop Apache Spark, and exposes Scala and Python APIs.

First, I’ll cover what time-series data is, why you should care about it, and how to think about distributing it across machines in a cluster. If you’re impatient to actually use the `spark-ts`

library, jump straight to the end where we demo the Scala API.

## What is Time-Series Data?

As foreshadowed in the introduction, time-series data consists of sequences of measurements, each occurring at a point in time.

A variety of terms are used to describe time-series data, and many of them apply to conflicting or overlapping concepts. In the interest of clarity, in `spark-ts`

, we stick to a particular vocabulary:

- A
*time series*is a sequence of floating-point values, each linked to a timestamp. In particular, we try as hard as possible to stick with “time series” as meaning a*univariate*time series, although in other contexts it sometimes refers to a series with multiple values at the same timestamp. In Scala, a time series is usually represented by a Breeze vector, and in Python, a 1-D NumPy array, and has a DateTimeIndex somewhere nearby to link its values to points in time. - An
*instant*is the vector of values in a collection of time series corresponding to a single point in time. In the`spark-ts`

library, each time series is typically labeled with a*key*that enables identifying it among a collection of time series. - An
*observation*is a tuple of (timestamp, key, value), i.e. a single value in a time series or instant.

Not all data with timestamps is time-series data. For example, logs don’t fit directly into the time-series mold because they consist of discrete events, not scalar measurements taken at intervals. However, measurements of log-messages-per-hour would constitute a time series.

## Use Cases

In many cases, we do the same things with time-series data that we do with other types of data: We group and filter it, we compute basic summary statistics, we run regressions using each instant as a sample. Time is, of course, ubiquitous, and many data sets that we don’t specifically consider “time-series data” still qualify.

However, certain operations and patterns of analysis are exclusively for time-series data. For example, with a typical data set, we might impute missing values by looking at the mean across an entire column, but in time-series data, it often makes sense to use the data points that are adjacent in time. Alternatively, we might want to down-sample a data set with daily measurements to compute weekly sums.

From a statistical modeling perspective, the most interesting thing about time-series data is its dependency structure. As long as we’re not stuck inside the plot of Primer, at a given point in time, only simultaneous or prior values can have an effect on its value. Many time-series models rely on even more specific dependency assumptions. For example, autoregressive (AR) models express the value of a series at each time point as a linear function of values at immediately previous time points. In seasonal models, the value is affected by a periodic function spanning the series.

The `spark-ts`

library supports these time series-specific munging patterns and offers a variety of statistical functionality for modeling time-series data.

## Laying Out Time-Series Data in Memory

The layout of a time-series data set determines what questions are easy to ask about it, and how quickly the answers to those questions can be computed. In Spark, we work with RDDs and DataFrames, which are both collections of objects partitioned across a set of machines in the cluster. Thus when choosing the layout of a data set, the most important question is “What data do we store in each of these objects?” With time-series data, there are a few natural options for answering that question, which we’ll refer to as *observations*, *instants*, and *time series*.

### Observations DataFrame

One layout option is to think of your data as a table with three columns: timestamp, key, and value. In the Spark world, you can represent this concept with a DataFrame. With this representation, if you have X timestamps and Y variables (keys), your table will have X * Y records. In Spark-TS parlance, this is an “observations” layout.

An advantage of this layout is that one can append data without needing the full vector of values at a particular timestamp—each row only includes a single scalar value. You also don’t need to worry about changing your schema when you add more keys.

However, the observations layout is not ideal for performing analysis. To ask most questions, you need to perform a `group by`

on your data set, either by key or by timestamp, which is cumbersome as well as computationally expensive.

### Instants DataFrame

The “instants” layout is ideal for much of traditional machine learning—for example, if you want to build a supervised learning model that predicts one variable based on contemporaneous values of the others.

### TimeSeriesRDD

In the third layout, which is most central to`spark-ts`

, each object in the RDD stores a full univariate series.

With this layout, the operations that tend to apply exclusively to time series are much more efficient. For example, if you want to generate a set of lagged time series from your original collection of time series, each lagged series can be computed by only looking at a single record in the input RDD. Similarly, with imputing missing values based on surrounding values, or fitting time-series models to each series, all the data needed is present in a single array.

The library contains utilities for converting datasets between these different representations, by the way.

## Coding Against the Spark-TS API

The code, data, and surrounding Maven project for the following example are included in the spark-ts-examples repo.

The most straightforward way to access Spark-TS from Scala is to depend on it in a Maven project. Do that by including the following repo in the `pom.xml`

:

1 2 3 4 5 6 7 |
<repositories> <repository> <id>cloudera-repos</id> <name>Cloudera Repos</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> </repositories> |

And including the following dependency in the `pom.xml`

:

1 2 3 4 5 |
<dependency> <groupId>com.cloudera.sparkts</groupId> <artifactId>sparkts</artifactId> <version>0.1.0</version> </dependency> |

Alternatively, to access it in a spark-shell, download the JAR, and then launch the shell with:

1 2 3 |
spark-shell \ --jars sparkts-0.1.0-jar-with-dependencies.jar \ --driver-class-path sparkts-0.1.0-jar-with-dependencies.jar |

The central abstraction of the `spark-ts`

library is the TimeSeriesRDD, which is simply a collection of time series on which you can operate in a distributed fashion. The time series in the collection are aligned, meaning that position X in the array representing one of the time series refers to the same time point as position X in all the other series. This approach allows you to avoid storing timestamps for each series and instead store a single `DateTimeIndex`

to which all the series vectors conform.

`TimeSeriesRDD[K]`

extends `RDD[(K, Vector[Double])]`

, where K is the key type (usually a `String`

), and the second element in the tuple is a Breeze vector representing the time series.

**Example**

Let’s use the library to play around with some ticker data. The data, comes in a tab-separated values file with columns year, month, day, ticker symbol, volume, price.

1 2 3 4 |
2015 8 14 ADP 194911 82.99 2015 9 14 NKE 224435 111.78 2015 9 18 DO 678664 20.18 2015 8 7 TGT 147406 78.96 |

In the layouts introduced above, this is a table of observations. I omitted it here for the sake of brevity, but the `loadTickerObservations`

function in this example loads this into a Spark DataFrame with three columns: “timestamp”, “symbol”, and “price”. The omitted code is vanilla Spark DataFrames code, nothing specific to`spark-ts`

.

1 |
val tickerObs = loadTickerObservations(sqlContext, "../data/ticker.tsv") |

With a DataFrame of observations in hand, you can use the `timeSeriesRDDFromObservations`

function to convert it into a `TimeSeriesRDD`

.

1 2 3 4 5 6 7 |
// Create an daily DateTimeIndex over August and September 2015 val dtIndex = DateTimeIndex.uniform( new DateTime("2015-08-03"), new DateTime("2015-09-22"), new BusinessDayFrequency(1)) // Align the ticker data on the DateTimeIndex to create a TimeSeriesRDD val tickerTsrdd = TimeSeriesRDD.timeSeriesRDDFromObservations(dtIndex, tickerObs, "timestamp", "symbol", "price") |

Because a `TimeSeriesRDD`

is an RDD, it supports all the standard RDD functionality. First, let’s cache it in memory so that it won’t need to be loaded from disk each time it’s referenced:

1 |
tickerTsrdd.cache() |

Use the Spark’s `count`

action to see how many series it contains. In this case, there is one series per symbol.

1 2 3 |
println(tickerTsrdd.count()) 104 |

Given a `TimeSeriesRDD`

, it’s easy to perform operations on all the time series inside it in parallel. For example, you might want to impute missing values using linear interpolation:

1 |
val filled = tickerTsrdd.fill("linear") |

filled is another `TimeSeriesRDD`

containing transformed values from the original `TimeSeriesRDD`

. Then you can convert your series of prices into a series of rates of return:

1 |
val returnRates = filled.returnRates() |

`returnRates`

is, again, a `TimeSeriesRDD`

.

Finally, perhaps you’re curious about whether the rates of return exhibit serial correlation, which is when the values tend to be related to the values at directly previous time points. One way to figure that is to compute the Durbin-Watson statistic for each series.

1 |
val dwStats = returnRates.mapValues(TimeSeriesStatisticalTests.dwtest(_)) |

`dwStats`

is an RDD of `(String, Double)`

tuples, where the `String`

is the symbol and the `Double`

is the statistic. Then you can look at the max (largest evidence for positive serial correlation) and the min (largest evidence for negative serial correlation). Because tuples are ordered lexicographically, you need to swap the values in the tuples so that the statistic comes first:

1 2 3 4 5 6 7 |
println(dwStats.map(_.swap).min) (1.3607754934239373,FISV) println(dwStats.map(_.swap).max) (2.9699508244045685,MO) |

**Conclusion**

You should now have good insight into what it feels like to use the `spark-ts`

library. If you’re interested in learning more, check out the full documentation.

Many thanks to Ahmed Mahran, Chris Dalzell, Jose Cambronero, Sean Owen, and Simon Ouellette for their contributions to the project!

*Sandy Ryza is a Data Scientist at Cloudera, and an Apache Spark committer. He is co-author of the O’Reilly Media book, *Advanced Analytics with Spark.

Are values limited to Double floating point ? Or could BigInt/Integer also be used ?

There are applications, in particular involving computation on currencies, where that is not desirable because of floating point error, and a BigInt would be preferred.

They used org.apache.spark.mllib.linalg.Vector class which (from scala doc) ” Represents a numeric vector, whose index type is Int and value type is Double. “. So only Double as a value, i think.

Just the thing I need. We are working on analyzing time series data of VMs in a data center and this library came just in time: I was about to post on spark users group :)

The vector gets populated with NaNs if the time series does not have values for some instants and NaNs remain if they are in the beginning even after fill. Should’t fill take some initial value or fill with neighbor?

I am also facing the same issue. has your problem resolved ?

First, I am not able to read time series data from a log file. I am getting many types of format issue for datetimeIndex. is there any working sample test data and code available to refer?

Second, I tried to simulate test data programmatically, however I am facing issue with the fill method of TimeSeriesRDD. As fill method is used to derive missing values using linear interpolation . however when I tried the fill method, the vector gets populated with NaNs values.

Any help or pointer on this is highly appreciated.

Thanx

Brijesh, we suggest that you post details about your issue in the “Advanced Analytics/Spark” area at community.cloudera.com.

Hi,

Does someone try to use this in machine learning models ? I´m trying to do machine learning on time series data … Regards

http://stackoverflow.com/questions/35241173/time-series-forecast-on-spark

Link to “DateTimeIndex” not working!

Fixed, sorry about that.

Documentation link is broken.

http://cloudera.github.io/spark-timeseries/0.1.0/index.html

The correct (and current) link for docs is:

http://sryza.github.io/spark-timeseries/

I am unable to use the jar in my spark-shell using the method above…. Any help?

The link for the loadTickerObservations example is broken.

” I omitted it here for the sake of brevity, but the loadTickerObservations function in this example loads this into a Spark DataFrame with three columns”

Link fixed.

illegal state: maximal count (10,000) exceeded: evaluations

Getting this error when I am giving higher values of p,d,q like higher than 3

Same problem here.

That looks like a good question for the Cloudera Community. Click me