# Simple Moving Average, Secondary Sort, and MapReduce (Part 3)

- by Josh Patterson
- April 11, 2011
- 5 comments

*This is the final piece to a three part blog series. If you would like to view the previous parts to this series please use the following link:*

*Part 1 – A Simple Moving Average in Excel*

*Part 2 – A Simple Moving Average in R*

Previously I explained how to use Excel and R as the analysis tools to calculate the Simple Moving Average of a small set of stock closing prices. In this final piece to the three part blog series, I will delve into using MapReduce to find the Simple Moving Average of our small sample data set. Then, I will show you how using the same code, you will be able to calculate the Simple Moving Average of every closing stock price since 1980.

## Down the Rabbit Hole With Hadoop

In the above examples we took a look at calculating the simple moving average of a relatively small amount of data. For a lot of analysis, excel and R are very effective tools, but as we scale towards gigabyte, terabyte, and petabyte data stores we run into some issues with data locality, disk speeds, and processing speeds.

To illustrate these factors let’s take a mythical machine that had a single 1 petabyte disk, which operated similarly to disk speeds today. For the purposes of this example we’ll use a read speed of 40 MB/s. Let’s say that it’s our job to scan through this data and produce a simple moving average, the processor does not impede the calculation, and we can sustain a moving window calculation through the data at the full 40 MB/s. Let’s also assume that the data was previously sorted and that we only had to perform a sequential scan; this maximizes the data throughput rate from the disk and it could consistently deliver 40MB/s to the processing pipeline. Based on Jeff Dean’s “12 Numbers Every Engineer Should Know” slide this is a plausible setup. At this throughput our simple moving average calculation of 1 petabyte of data would take around *310 days to complete*. For most situations this operational cost, in terms of time, makes it unreasonable to consider. Fortunately, the mechanics of HDFS and MapReduce mitigate these factors such that we can make this problem a linear time and capital function to help us decide the number of machines we want to implement to efficiently undertake this simple moving average scan.

In the above simple moving average example we neglected to consider the constraints of:

- Storing the petabyte of data on non-mythical hardware.
- Sorting the petabyte of data.
- Considering hardware failure during the 310 days of processing time.

Typically, time series applications need to scan the data at some point, which creates large mountains to climb, if we wish to approach large tomes of time series data in today’s systems. We’re seeing multi-terabyte and multi-petabyte data sources in the time series domain every day, including

- Sensor data
- Financial data
- Genome data

and in each of these domains the above scenario is a very real challenge to tackle.

HDFS solves the storage and failure issues above, but what about the sorting and processing issues? Sorting large amounts of data in itself is a non-trivial problem, yet is approachable with a few tricks in MapReduce. Let’s take a look at real MapReduce code that we can download to compile and produce our own scalable simple moving average, to solve some of these pain points.

**Simple Moving Average in MapReduce**

Typically a MapReduce application is composed of two functions: (you guessed it) a map function and a reduce function. In the world of java programming we create a map class and a reduce class, each with inherit methods useful for their respectful purposes. We use the MapReduce programming model because it is built to mitigate concurrency issues in our algorithms and we get our scalable parallelism relatively painlessly.

The map function can involve code that performs a per-key-value pair operation, but its main logical operation is to group data by keys. A very easy way to think about a map function is to think of it as a logical projection of the data or a group by clause. The reduce function is used to take these groups (individually) and run a process across the values which were grouped together. Common operations in reduce functions include:

- Avg
- Min/Max
- Sum

In our simple moving average example, however, we don’t operate on a per value basis specifically, nor do we produce an aggregate across all of the values. Our operation in the aggregate sense involves a sliding window, which performs its operations on a subset of the data at each step. We also have to consider that the points in our time series data are not guaranteed to arrive at the reduce in order and need to be sorted–mentioned in previous sections. This is because with multiple map functions reading multiple sections of the source

data MapReduce does not impose any order on the key-value pairs that are grouped together in the default partition and sorting schemes. There is the scenario where we have sorted partitioned data, but for the sake of this example we’re going to deal with the more “garden-variety” unsorted time series data.

Let’s take a first pass at how we could design this MapReduce simple moving average job. We want to group all of one stock’s adjusted close values together so we can apply the simple moving average operation over the sorted time series data. We want to emit each time series key value pair keyed on a stock symbol to group these values together. In the reduce phase we can run an operation, here the simple moving average, over the data. Since the data more than likely will not arrive at the reducer in sorted order we’ll need to sort the data before we can calculate the simple moving average.

A common way to sort data is to load the data into memory in a data structure such as a heap, much like how this is done in a normal java program. In this case we’ll use Java’s priority queue class to sort our data. We also need to consider the amount of memory used by the incoming time series data during sorting as this is a limiting factor on how much data we can sort. In this design we have to load all of the time series data before we can start processing and if the amount of data to sort exceeds the available heap size we have a problem. An example of

this implementation is hosted at github:

To run this code on your own Hadoop cluster, download CDH from Cloudera and setup a pseudo-distributed cluster–which is a single node of Hadoop. Pseudo-distributed mode is a great way to try out code with Hadoop. Next download and compile the moving average code into a jar. To download the code directly from github (in the shell in MacOSX, ssh terminal window in linux, or MINGW32 for win32) we’ll use the command:

1 |
git clone git://github.com/jpatanooga/Caduceus |

To compile we can either use Ant and simply type:

1 |
ant |

or we can open the code in our favorite java IDE and compile it into a jar (make sure to add the lib directory into the jar). Then copy this jar to your cluster to run the job. Next we’ll need to copy the input data from the project’s local data subdirectory to a place in hdfs. Specifically this file is yahoo_stock_AA_32_mini.csv, which is downloaded in the git clone command above into the /data/movingaverage subdirectory of the project. We’ll need to copy this data into HDFS with the command:

1 |
hadoop fs -copyFromLocal data/movingaverage/yahoo_stock_AA_32_mini.csv /<somewhere_in_hdfs> |

With the jar on the VM (or cluster accessible machine) and our sample data loaded into hdfs we will run the job with the command:

1 2 3 |
hadoop jar Caduceus-0.1.0.jar tv.floe.caduceus.hadoop.movingaverage.NoShuffleSort_MovingAverageJob <input_hdfs_dir_where_we_put_data> <output_hdfs_results_dir> |

After we run the MapReduce job, we can take a look at the results with the command:

1 |
hadoop fs -cat /<output_hdfs_results_dir>/part-00000 |

which should show:

1 2 3 |
Group: AA, Date: 2008-03-03 Moving Average: 33.529335 Group: AA, Date: 2008-03-04 Moving Average: 34.529335 Group: AA, Date: 2008-03-05 Moving Average: 35.396 |

Our first pass is a decent solution, but we’re limited by our Java Virtual Machine (JVM) child heap size and we are taking time to manually sort the data ourselves. With a few design changes, we can solve both of these issues taking advantage of some inherent properties of MapReduce. First we want to look at the case of sorting the data in memory on each reducer. Currently we have to make sure we never send more data to a single reducer than can fit in memory. The way we can currently control this is to give each reducer child JVM more heap and/or to further partition our time series data in the map phase. In this case we’d partition further by time, breaking our data into smaller windows of time.

As opposed to further partitioning of the data, another approach to this issue is to allow Hadoop to sort the data for us in what’s called the “shuffle phase” of MapReduce. If the data arrives at a reducer already in sorted order we can lower our memory footprint and reduce the number of loops through the data by only looking at the next N samples for each simple moving average calculation. This brings us to the crucial aspect of this article, which is called the shuffle’s “secondary sort” mechanic. Sorting is something we can let Hadoop do for us and Hadoop has proven to be quite good at sorting large amounts of data, winning the Gray Sort competition in 2008. In using the secondary sort mechanic we can solve both our heap and sort issues fairly simply and efficiently. To employ secondary sort in our code, we need to make the key a composite of the natural key and the natural value. Below in Figure-1 we see a diagram of how this would look visually.

Figure-1: *Composite Key Diagram*

The Composite Key gives Hadoop the needed information during the shuffle to perform a sort not only on the “stock symbol”, but on the time stamp as well. The class that sorts these Composite Keys is called the key comparator or here “CompositeKeyComparator”. The key comparator should order by the composite key, which is the combination of the natural key and the natural value. We can see below in Figure-2 where an abstract version of secondary sort is being performed on a composite key of 2 integers.

Figure-2: *CompositeKeyComparator sorting Composite Keys (keys are integers).*

In Figure-3 below we see a more realistic example where we’ve changed the Composite Key to have a stock symbol string (K1) and a timestamp (K2, displayed as a date, but in the code is a long in ms). The diagram has sorted the K/V pairs by both “K1: stock symbol” (natural key) and “K2: time stamp” (secondary key).

Figure-3: *CompositeKeyComparator at work on our composite keys. Composite key now represented with a string stock symbol (K1) and a date (K2).*

Once we’ve sorted our data on the composite key, we now need to partition the data for the reduce phase. In Figure-4 below we see how the data from Figure-3 above has been partitioned with the NaturalKeyPartitioner.

Figure-4: *Partitioning by the natural key with the NaturalKeyPartitioner*.

Once we’ve partitioned our data the reducers can now start downloading the partition files and begin their merge phase. Inf Figure-5 below we see how the grouping comparator, or NaturalKeyGroupingComparator, is used to make sure a reduce() call only sees the logically grouped data meant for that composite key.

Figure-5: *Grouping Comparator merging partition files*.

The partitioner and grouping comparator for the composite key should consider only the natural key for partitioning and grouping.

Below is a short description of the Simple Moving Average code which is altered to use the secondary sort and is hosted on github. If you’ll notice, the names of the classes closely match the terminology used in the diagrams above and in Tom White’s “Hadoop: The Definitive Guide” (chapter 8 “MapReduce Features”) so as to make the code easier to understand.

**NaturalKey** – what you would normally use as the key or “group by” operator.

- In this case the Natural Key is the “group” or “stock symbol” as we need to group potentially unsorted stock data before we can sort it and calculate the simple moving average.

**Composite Key** – A Key that is a combination of the natural key and the natural value we want to sort by.

## Filed under:

5 ResponsesEvan Sparks / April 15, 2011 / 11:12 AMCool trick with the split sorter/partitioner. As far as I can tell this works great until the series become extremely long (think 30 years of tick-level data) – seems like partitioning by time might be very tricky. Do you know of anything built into hadoop like an “overlapping partitioner” which can spit the same data to multiple partitions?

I have experimented with mappers that duplicate values across multiple keys, but I wonder if there’s a more conventional way of doing this.

Josh Patterson / April 16, 2011 / 8:55 AMEvan,

You are dead on with the size of the data in a single keyspace. I hit this same issue when working on the openPDC project for the NERC:

http://opendpdc.codeplex.com

One sensor could have literally billions of points in a very short amount of time, so for prototype jobs we keyed things to a single day (3,600,000ms):

https://openpdc.svn.codeplex.com/svn/Hadoop/Current%20Version/src/TVA/Hadoop/MapReduce/Datamining/SAX/SlidingClassifier_1NN_Euc.java

In a more complex version I would have used overlapping time slots so the mapper would get sufficient data from adjacent keyspaces to cover a single window length. For now I’d say you are on the right track with the duplicate values.

Josh

Ashwin Jayaprakash / April 11, 2012 / 4:54 PMI know this is not related to moving averages, but how accurate was the SAX time series matching used in PDC?

Jacob / April 28, 2013 / 11:59 AMHi,

I implemented something like this (except using the MapReduce API 2), and in the loop of the reduce() function, whenever the .next() method is called on the Iterator, we get a new value, but the _key_ also miraculously changes. Rather, the part of the composite key that was not used as a natural key (the timestamp in this example) changes. This was quite surprising. How does this happen?

Thanks