Simple Moving Average, Secondary Sort, and MapReduce (Part 3)
- by Josh Patterson
- April 11, 2011
- 5 comments
This is the final piece to a three part blog series. If you would like to view the previous parts to this series please use the following link:
Part 1 – A Simple Moving Average in Excel
Part 2 – A Simple Moving Average in R
Previously I explained how to use Excel and R as the analysis tools to calculate the Simple Moving Average of a small set of stock closing prices. In this final piece to the three part blog series, I will delve into using MapReduce to find the Simple Moving Average of our small sample data set. Then, I will show you how using the same code, you will be able to calculate the Simple Moving Average of every closing stock price since 1980.
Down the Rabbit Hole With Hadoop
In the above examples we took a look at calculating the simple moving average of a relatively small amount of data. For a lot of analysis, excel and R are very effective tools, but as we scale towards gigabyte, terabyte, and petabyte data stores we run into some issues with data locality, disk speeds, and processing speeds.
To illustrate these factors let’s take a mythical machine that had a single 1 petabyte disk, which operated similarly to disk speeds today. For the purposes of this example we’ll use a read speed of 40 MB/s. Let’s say that it’s our job to scan through this data and produce a simple moving average, the processor does not impede the calculation, and we can sustain a moving window calculation through the data at the full 40 MB/s. Let’s also assume that the data was previously sorted and that we only had to perform a sequential scan; this maximizes the data throughput rate from the disk and it could consistently deliver 40MB/s to the processing pipeline. Based on Jeff Dean’s “12 Numbers Every Engineer Should Know” slide this is a plausible setup. At this throughput our simple moving average calculation of 1 petabyte of data would take around 310 days to complete. For most situations this operational cost, in terms of time, makes it unreasonable to consider. Fortunately, the mechanics of HDFS and MapReduce mitigate these factors such that we can make this problem a linear time and capital function to help us decide the number of machines we want to implement to efficiently undertake this simple moving average scan.
In the above simple moving average example we neglected to consider the constraints of:
- Storing the petabyte of data on non-mythical hardware.
- Sorting the petabyte of data.
- Considering hardware failure during the 310 days of processing time.
Typically, time series applications need to scan the data at some point, which creates large mountains to climb, if we wish to approach large tomes of time series data in today’s systems. We’re seeing multi-terabyte and multi-petabyte data sources in the time series domain every day, including
- Sensor data
- Financial data
- Genome data
and in each of these domains the above scenario is a very real challenge to tackle.
HDFS solves the storage and failure issues above, but what about the sorting and processing issues? Sorting large amounts of data in itself is a non-trivial problem, yet is approachable with a few tricks in MapReduce. Let’s take a look at real MapReduce code that we can download to compile and produce our own scalable simple moving average, to solve some of these pain points.
Simple Moving Average in MapReduce
Typically a MapReduce application is composed of two functions: (you guessed it) a map function and a reduce function. In the world of java programming we create a map class and a reduce class, each with inherit methods useful for their respectful purposes. We use the MapReduce programming model because it is built to mitigate concurrency issues in our algorithms and we get our scalable parallelism relatively painlessly.
The map function can involve code that performs a per-key-value pair operation, but its main logical operation is to group data by keys. A very easy way to think about a map function is to think of it as a logical projection of the data or a group by clause. The reduce function is used to take these groups (individually) and run a process across the values which were grouped together. Common operations in reduce functions include:
- Avg
- Min/Max
- Sum
In our simple moving average example, however, we don’t operate on a per value basis specifically, nor do we produce an aggregate across all of the values. Our operation in the aggregate sense involves a sliding window, which performs its operations on a subset of the data at each step. We also have to consider that the points in our time series data are not guaranteed to arrive at the reduce in order and need to be sorted–mentioned in previous sections. This is because with multiple map functions reading multiple sections of the source
data MapReduce does not impose any order on the key-value pairs that are grouped together in the default partition and sorting schemes. There is the scenario where we have sorted partitioned data, but for the sake of this example we’re going to deal with the more “garden-variety” unsorted time series data.
Let’s take a first pass at how we could design this MapReduce simple moving average job. We want to group all of one stock’s adjusted close values together so we can apply the simple moving average operation over the sorted time series data. We want to emit each time series key value pair keyed on a stock symbol to group these values together. In the reduce phase we can run an operation, here the simple moving average, over the data. Since the data more than likely will not arrive at the reducer in sorted order we’ll need to sort the data before we can calculate the simple moving average.
A common way to sort data is to load the data into memory in a data structure such as a heap, much like how this is done in a normal java program. In this case we’ll use Java’s priority queue class to sort our data. We also need to consider the amount of memory used by the incoming time series data during sorting as this is a limiting factor on how much data we can sort. In this design we have to load all of the time series data before we can start processing and if the amount of data to sort exceeds the available heap size we have a problem. An example of
this implementation is hosted at github:
To run this code on your own Hadoop cluster, download CDH from Cloudera and setup a pseudo-distributed cluster–which is a single node of Hadoop. Pseudo-distributed mode is a great way to try out code with Hadoop. Next download and compile the moving average code into a jar. To download the code directly from github (in the shell in MacOSX, ssh terminal window in linux, or MINGW32 for win32) we’ll use the command:
git clone git://github.com/jpatanooga/Caduceus
To compile we can either use Ant and simply type:
ant
or we can open the code in our favorite java IDE and compile it into a jar (make sure to add the lib directory into the jar). Then copy this jar to your cluster to run the job. Next we’ll need to copy the input data from the project’s local data subdirectory to a place in hdfs. Specifically this file is:
which is downloaded in the git clone command above into the /data/movingaverage subdirectory of the project. We’ll need to copy this data into HDFS with the command:
hadoop fs -copyFromLocal data/movingaverage/yahoo_stock_AA_32_mini.csv /<somewhere_in_hdfs>
With the jar on the VM (or cluster accessible machine) and our sample data loaded into hdfs we will run the job with the command:
hadoop jar Caduceus-0.1.0.jar tv.floe.caduceus.hadoop.movingaverage.NoShuffleSort_MovingAverageJob <input_hdfs_dir_where_we_put_data> <output_hdfs_results_dir>
After we run the MapReduce job, we can take a look at the results with the command:
hadoop fs -cat /<output_hdfs_results_dir>/part-00000
which should show:
Group: AA, Date: 2008-03-03 Moving Average: 33.529335 Group: AA, Date: 2008-03-04 Moving Average: 34.529335 Group: AA, Date: 2008-03-05 Moving Average: 35.396
Our first pass is a decent solution, but we’re limited by our Java Virtual Machine (JVM) child heap size and we are taking time to manually sort the data ourselves. With a few design changes, we can solve both of these issues taking advantage of some inherent properties of MapReduce. First we want to look at the case of sorting the data in memory on each reducer. Currently we have to make sure we never send more data to a single reducer than can fit in memory. The way we can currently control this is to give each reducer child JVM more heap and/or to further partition our time series data in the map phase. In this case we’d partition further by time, breaking our data into smaller windows of time.
As opposed to further partitioning of the data, another approach to this issue is to allow Hadoop to sort the data for us in what’s called the “shuffle phase” of MapReduce. If the data arrives at a reducer already in sorted order we can lower our memory footprint and reduce the number of loops through the data by only looking at the next N samples for each simple moving average calculation. This brings us to the crucial aspect of this article, which is called the shuffle’s “secondary sort” mechanic. Sorting is something we can let Hadoop do for us and Hadoop has proven to be quite good at sorting large amounts of data, winning the Gray Sort competition in 2008. In using the secondary sort mechanic we can solve both our heap and sort issues fairly simply and efficiently. To employ secondary sort in our code, we need to make the key a composite of the natural key and the natural value. Below in Figure-1 we see a diagram of how this would look visually.

Figure-1: Composite Key Diagram
The Composite Key gives Hadoop the needed information during the shuffle to perform a sort not only on the “stock symbol”, but on the time stamp as well. The class that sorts these Composite Keys is called the key comparator or here “CompositeKeyComparator”. The key comparator should order by the composite key, which is the combination of the natural key and the natural value. We can see below in Figure-2 where an abstract version of secondary sort is being performed on a composite key of 2 integers.

Figure-2: CompositeKeyComparator sorting Composite Keys (keys are integers).
In Figure-3 below we see a more realistic example where we’ve changed the Composite Key to have a stock symbol string (K1) and a timestamp (K2, displayed as a date, but in the code is a long in ms). The diagram has sorted the K/V pairs by both “K1: stock symbol” (natural key) and “K2: time stamp” (secondary key).

Figure-3: CompositeKeyComparator at work on our composite keys. Composite key now represented with a string stock symbol (K1) and a date (K2).
Once we’ve sorted our data on the composite key, we now need to partition the data for the reduce phase. In Figure-4 below we see how the data from Figure-3 above has been partitioned with the NaturalKeyPartitioner.

Figure-4: Partitioning by the natural key with the NaturalKeyPartitioner.
Once we’ve partitioned our data the reducers can now start downloading the partition files and begin their merge phase. Inf Figure-5 below we see how the grouping comparator, or NaturalKeyGroupingComparator, is used to make sure a reduce() call only sees the logically grouped data meant for that composite key.

Figure-5: Grouping Comparator merging partition files.
The partitioner and grouping comparator for the composite key should consider only the natural key for partitioning and grouping.
Below is a short description of the Simple Moving Average code which is altered to use the secondary sort and is hosted on github. If you’ll notice, the names of the classes closely match the terminology used in the diagrams above and in Tom White’s “Hadoop: The Definitive Guide” (chapter 8 “MapReduce Features”) so as to make the code easier to understand.
NaturalKey – what you would normally use as the key or “group by” operator.
- In this case the Natural Key is the “group” or “stock symbol” as we need to group potentially unsorted stock data before we can sort it and calculate the simple moving average.
Composite Key – A Key that is a combination of the natural key and the natural value we want to sort by.
- In this case it would be the TimeseriesKey class which has two members:
- Where the natural key is “Group” and the natural value is the “Timestamp” member.
- Compares two composite keys for sorting.
- Should order by composite key.
- Partitioner should only consider the natural key.
- Blocks all data into a logical group, inside which we want the secondary sort to occur on the natural value, or the second half of the composite key.
- Normal hash partitioner would hash the object and send each key/value pair to a separate reducer.
- Should only consider the natural key.
- Inside a partition, a reducer is run on the different groups inside of the partition.
- A custom grouping comparator makes sure that a single reducer sees a custom view of the groups, sometimes grouping values across natural value “borders” in the composite key.
In this example we also use a sliding window, which is slightly over-kill with respect to a simple process such as a simple moving average; yet this was done to setup the basis for more complex moving window techniques such as classification and other statistical methods.
To run this code on your own Hadoop cluster we downloaded the classes needed in the above git clone command. If you haven’t already, use ant or open the code in your favorite java IDE and compile it into a jar. Then copy this jar to your cluster to run the job. Once the jar is on the VM (or cluster accessible machine) and the data above is copied into hdfs, run the job with the command:
hadoop jar Caduceus-0.1.0.jar tv.floe.caduceus.hadoop.movingaverage.MovingAverageJob <input_hdfs_dir_where_we_put_data> <output_hdfs_results_dir>
The output should look identical to the output from the previous MapReduce run.
Now let’s throw more data at MapReduce and use the same code as earlier to see how we can really leverage the power of Hadoop. Download the NYSE stock data from infochimps:
http://infochimps.com/datasets/daily-1970-2010-open-close-hi-low-and-volume-nyse-exchange
and load it into hadoop. Now run the same MapReduce application as above except use the HDFS directory you placed the NYSE data in as the source. Depending on the size of your cluster, you may only want to run this on 1 file to save time. Your output should be similar to above except it should include a moving average for every stock in the input files. (note: the Yahoo! stock data is missing many values which–depending on the processing and calculation of the sliding window for the SMA–can fluctuate the outcome. I’ve left the exercise of adjusting the window for other variations such as this to the reader.)
So that’s how we’d take a lot of unsorted stock data and produce a simple moving average for each stock using Hadoop. While at first some of the plumbing may be a bit non-intuitive, once you get the hang of it the secondary sort technique opens up a lot of applications in the sensor, financial, and genomic classes.
Summary
This three part blog series explained the structure of a moving average application implemented in MapReduce, and illustrated the power of secondary sort. This article also described how to work with infochimps data in a sliding window. This same technique can be used as the basis for solving many other financial analysis problems. And finally, by following the above examples the reader now has base code which can be played with and used to build other custom applications.
Feel free to ask questions and follow author Josh Patterson on Twitter: @jpatanooga.
-
Evan Sparks /
April 15, 2011 / 11:12 AM
Cool trick with the split sorter/partitioner. As far as I can tell this works great until the series become extremely long (think 30 years of tick-level data) – seems like partitioning by time might be very tricky. Do you know of anything built into hadoop like an “overlapping partitioner” which can spit the same data to multiple partitions?
I have experimented with mappers that duplicate values across multiple keys, but I wonder if there’s a more conventional way of doing this.
-
Josh Patterson /
April 16, 2011 / 8:55 AM
Evan,
You are dead on with the size of the data in a single keyspace. I hit this same issue when working on the openPDC project for the NERC:One sensor could have literally billions of points in a very short amount of time, so for prototype jobs we keyed things to a single day (3,600,000ms):
In a more complex version I would have used overlapping time slots so the mapper would get sufficient data from adjacent keyspaces to cover a single window length. For now I’d say you are on the right track with the duplicate values.
Josh
-
Ashwin Jayaprakash /
April 11, 2012 / 4:54 PM
I know this is not related to moving averages, but how accurate was the SAX time series matching used in PDC?
-
Jacob /
April 28, 2013 / 11:59 AM
Hi,
I implemented something like this (except using the MapReduce API 2), and in the loop of the reduce() function, whenever the .next() method is called on the Iterator, we get a new value, but the _key_ also miraculously changes. Rather, the part of the composite key that was not used as a natural key (the timestamp in this example) changes. This was quite surprising. How does this happen?
Thanks
Filed under
Share this post