**UPDATED 20130424:** The new RHadoop treats output to Streaming a bit differently, so `do.trace=FALSE`

must be set in the `randomForest`

call.

**UPDATED 20130408:** Antonio Piccolboni, the author of RHadoop, has improved the code somewhat using his substantially greater experience with R. The most material change is that the latest version of RHadoop can bind multiple calls to keyval correctly.

Internet-scale data sets present a unique challenge to traditional machine-learning techniques, such as fitting random forests or “bagging“. In order to fit a classifier to a large data set, it’s common to generate many smaller data sets derived from the initial large data set (i.e.,resampling). There are two reasons for this:

- Large data sets typically live in a cluster, so any operations should have some level of parallelism. Separate models are fit on separate nodes that contain different subsets of the initial data.
- Even if you could use the entire initial data set to fit a single model, it turns out that ensemble methods, where you fit multiple smaller models using subsets of the data, generally outperform single models. Indeed, fitting a single model with 100M data points can perform
**worse**than fitting just a few models with 10M data points each (so less total data outperforms more total data; e.g., see this paper).

Furthermore, bootstrapping is another popular method that randomly chops up an initial data set in order to characterize distributions of statistics and also to build ensembles of classifiers (e.g., bagging). In fact, parallelizing bootstrap sampling or ensemble learning can provide significant performance gains even when your data set is not so large that it must live in a cluster. The gains from purely parallelizing the random number generation are still significant.

Sampling-with-replacement is the most popular method for sampling from the initial data set to produce a collection of samples for model fitting. This method is equivalent to sampling from a multinomial distribution where the probability of selecting any individual input data point is uniform over the entire data set. *Unfortunately, it is not possible to sample from a multinomial distribution across a cluster without using some kind of communication between the nodes (i.e., sampling from a multinomial is not embarrassingly parallel)*. But do not despair: we can approximate a multinomial distribution by sampling from an identical Poisson distribution on each input data point *independently*, lending itself to an embarrassingly parallel implementation.

Below, we will show you how to implement such a Poisson approximation to enable you to train a random forest on an enormous data set. As a bonus, we’ll be implementing it in R and RHadoop, as R is many people’s statistical tool of choice. Because this technique is broadly applicable to any situation involving resampling a large data set, we begin with a fully general description of the problem and solution.

## Formal Problem Statement

Our situation is as follows:

- We have
*N*data points in our initial training set {*x*}, where_{i}*N*is very large (10^{6}–10^{9}) and the data is distributed over a cluster. - We want to train a set of
*M*different models for an ensemble classifier, where*M*is anywhere from a handful to thousands. - We want each model to be trained with
*K*data points, where typically*K << N*. (For example,*K*may be 1–10% of*N.*)

The number of training data points available to us, *N*, is fixed and generally outside of our control. However, *K* and *M* are both parameters that we can set and their product *KM* determines the total number of input vectors that will be consumed in the model fitting process. There are three cases to consider:

*KM < N*, in which case we are not using the full amount of data available to us.*KM = N*, in which case we can exactly partition our data set to produce totally independent samples.*KM > N*, in which case we must resample some of our data with replacement.

The Poisson sampling method described below handles all three cases in the same framework. (However, note that for the case *KM = N*, it does not partition the data, but simply resamples it as well.)

(Note: The case where *K = N* corresponds exactly to bootstrapping the full initial data set, but this is often not desired for very large data sets. Nor is it practical from a computational perspective: performing a bootstrap of the full data set would require the generation of *MN* data points and *M* scans of an *N*-sized data set. However, in cases where this computation is desired, there exists an approximation called a “Bag of Little Bootstraps”.)

So our goal is to generate *M* data sets of size *K* from the original *N* data points where *N* can be very large and the data is sitting in a distributed environment. The two challenges we want to overcome are:

- Many resampling implementations perform
*M*passes through the initial data set. which is highly undesirable in our case because the initial data set is so large. - Sampling-with-replacement involves sampling from a multinomial distribution over the
*N*input data points. However, sampling from a multinomial distribution requires message passing across the entire data set, so it is not possible to do so in a distributed environment in an embarrassingly parallel fashion (i.e., as a map-only MapReduce job).

## Poisson-approximation Resampling

Our solution to these issues is to approximate the multinomial sampling by sampling from a Poisson distribution for each input data point separately. For each input point *x _{i}*, we sample

*M*times from a Poisson(

*K / N*) distribution to produce

*M*values {

*m*}, one for each model

_{j}*j*. For each data point

*x*and each model

_{i}*j*, we emit the key-value pair

*<j, x*a total of

_{i}>*m*times (where

_{j}*m*can be zero). Because the sum of multiple Poisson variables is Poisson, the number of times a data point is emitted is distributed as Poisson(

_{j}*KM / N*), and the size of each generated sample is distributed as Poisson(

*K*), as desired. Because the Poisson sampling occurs for each input point independently, this sampling method can be parallelized in the map portion of a MapReduce job.

(Note that our approximation never guarantees that every single input data point is assigned to at least one of the models, but this is no worse than multinomial resampling of the full data set. However, in the case where *KM = N*, this is particularly bad in contrast to the alternative of partitioning the data, as partitioning will guarantee independent samples using all *N* training points, while resampling can only generate (hopefully) uncorrelated samples with a fraction of the data.)

Ultimately, each generated sample will have size *K* *on average*, and so this method will approximate the exact multinomial sampling method with a *single pass* through the data in an embarrassingly parallel fashion, addressing both of the big data limitations described above. Because we are randomly sampling from the initial data set, and similarly to the “exact” method of multinomial sampling, it’s possible that some of the initial input vectors will never be chosen for any of the samples. In fact, we expect that approximately exp{–*KM / N*} of the initial data will be entirely missing from any of the samples (see figure below).

**Amount of missed data as a function of KM / N. The value for KM = N is marked in gray.**

Finally, the MapReduce shuffle distributes all the samples to the reducers and the model fitting or statistic computation is performed on the reduce side of the computation.

The algorithm for performing the sampling is presented below in pseudocode. Recall that there are three parameters — *N*, *M*, and *K* — where one is fixed; we choose to specify *T = K / N* as one of the parameters as it eliminates the need to determine the value of *N* in advance.

1 2 3 4 5 6 7 8 9 10 11 12 13 |
# example sampling parameters T = 0.1 # param 1: K / N; average fraction of input data in each model; 10% M = 50 # param 2: number of models def map(k, v): // for each input data point for i in 1:M // for each model m = Poisson(T) // num times curr point should appear in this sample if m > 0 for j in 1:m // emit current input point proper num of times emit (i, v) def reduce(k, v): fit model or calculate statistic with the sample in v |

Note that even more significant performance enhancements can be achieved if it is possible to use a combiner, but this is highly statistic/model-dependent.

## Example: Kaggle Data Set on Bulldozer Sale Prices

We will apply this method to test out training of a random forest regression model on a Kaggle data set found here. The data set comprises ~400k training data points. Each data point represents a sale of a particular bulldozer at an auction, for which we have the sale price along with a set of other features about the sale and about the bulldozer. (This data set is not especially large, but will illustrate our method nicely.) The goal will be to build a regression model using an ensemble method (specifically, a random forest) to predict the sale price of a bulldozer from the available features.

**Could be yours for $141,999.99**

The data are actually supplied as two tables: a transaction table that includes the sale price (target variable) and some other features, including a reference to a specific bulldozer; and a bulldozer table, that contains additional features for each bulldozer. As this post does not concern itself with data munging, we will assume that the data come pre-joined. But in a real-life situation, we’d incorporate the join as part of the workflow by, for example, processing it with a Hive query or a Pig script. Since in this case the data are relatively small, we simply use some R commands. The code to prepare the data can be found here.

## Quick Note on R and RHadoop

As so much statistical work is performed in R, it is highly valuable to have an interface to use R over large data sets in a Hadoop cluster. This can be performed with RHadoop, which is developed with the support of Revolution Analytics. (Another option for R and Hadoop is the RHIPE project.)

One of the nice things about RHadoop is that R environments can be serialized and shuttled around, so there is never any reason to explicitly move any side data through Hadoop’s configuration or distributed cache. All environment variables are distributed around transparently to the user. Another nice property is that Hadoop is used quite transparently to the user, and the semantics allow for easily composing MapReduce jobs into pipelines by writing modular/reusable parts.

The only thing that might be unusual for the “traditional” Hadoop user (but natural to the R user) is that the mapper function should be written to be fully vectorized (i.e., `keyval()`

should be called once per mapper as the last statement). This is to maximize the performance of the mapper (since R’s interpreted REPL is quite slow), but it means that mappers receive multiple input records at a time and everything the mappers emit must be grouped into a single object.

Finally, I did not find the RHadoop installation instructions (or the documentation in general) to be in a very mature state, so here are the commands I used to install RHadoop on my small cluster.

## Fitting an Ensemble of Random Forests with Poisson Sampling on RHadoop

We implement our Poisson sampling strategy with RHadoop. We start by setting global values for our parameters:

1 2 |
frac.per.model <- 0.1 # 10% of input data to each sample on avg num.models <- 50 |

As mentioned previously, the mapper must deal with multiple input records at once, so there needs to be a bit of data wrangling before emitting the keys and values:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
# MAP function poisson.subsample <- function(k, input) { # this function is used to generate a sample from the current block of data generate.sample <- function(i) { # generate N Poisson variables draws <- rpois(n=nrow(input), lambda=frac.per.model) # compute the index vector for the corresponding rows, # weighted by the number of Poisson draws indices <- rep((1:nrow(input)), draws) # emit the rows; RHadoop takes care of replicating the key appropriately # and rbinding the data frames from different mappers together for the # reducer keyval(i, input[indices, ]) } # here is where we generate the actual sampled data c.keyval(lapply(1:num.models, generate.sample)) } |

Because we are using R, the reducer can be incredibly simple: it takes the sample as an argument and simply feeds it to our model-fitting function, `randomForest()`

:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
# REDUCE function fit.trees <- function(k, v) { # rmr rbinds the emited values, so v is a dataframe # note that do.trace=T is used to produce output to stderr to keep # the reduce task from timing out rf <- randomForest(formula=model.formula, data=v, na.action=na.roughfix, ntree=10, do.trace=FALSE) # rf is a list so wrap it in another list to ensure that only # one object gets emitted. this is because keyval is vectorized keyval(k, list(forest=rf)) } |

Keep in mind that in our case, we are actually fitting 10 trees per sample, but we could easily only fit a single tree per “forest”, and merge the results from each sample into a single real forest.

Note that the choice of predictors is specified in the variable `model.formula`

. R’s random forest implementation does not support factors that have more than 32 levels, as the optimization problem grows too fast. For the purpose of illustrating the Poisson sampling method, we chose to simply ignore those features, even though they probably contain useful information for regression. In a future blog post, we will address various ways that we can get around this limitation.

The MapReduce job itself is initiated like so:

1 2 3 4 5 |
mapreduce(input="/poisson/training.csv", input.format=bulldozer.input.format, map=poisson.subsample, reduce=fit.trees, output="/poisson/output") |

and the resulting trees are dumped in HDFS at `/poisson/output`

.

Finally, we can load the trees, merge them, and use them to classify new test points:

1 2 |
raw.forests <- values(from.dfs("/poisson/output")) forest <- do.call(combine, raw.forests) |

Each of 50 samples produced a random forest with 10 trees, so the final random forest is an ensemble of 500 trees, fitted in a distributed fashion over a Hadoop cluster. The full set of source files is available here.

Hopefully, you have now learned a scalable approach for training ensemble classifiers or bootstrapping in a parallel fashion by using a Poisson approximation to multinomial sampling.

*Uri Laserson (@laserson) is a data scientist at Cloudera.*

Hi, I am a developer on the RHadoop project. First thanks for this very interesting example. Then two notes. First is that the somewhat complex “reshaping” at the end of the mapper will be captured by a new API function, c.keyval, which will be available in rmr2.1. As far as the installation, the variety of platforms on which users are installing RHadoop is an obstacle to making installation easier, I think some software tools will be necessary. I am interested in any other feedback to help get the docs to the next level.

Pingback: Poisson Tails | Eventually Almost Everywhere

Pingback: How-to: Resample from a large data set in parallel (with R on Hadoop) ← Data Mining Course

Hi Uri, thanks for sharing this great article! Regarding your mention about the R limit of 32 levels per factor, do you have a new blog post yet to explain a workaround, along with example code? Thanks in advance!

Hi James, this is still in the works, but generally the solution is to find another representation for your features. Here are two examples:

http://stats.stackexchange.com/questions/49243/rs-randomforest-can-not-handle-more-than-32-levels-what-is-workaround

http://stackoverflow.com/questions/8774403/r-sampling-to-get-around-randomforest-32-factor-limit

Hi Uri, many thanks again for your advice to work around the R limit of 32 levels per factor. We’re now much farther along experimenting with your example code and our dataset, but then ran into the same problem you encountered earlier, as listed in your run session log

https://gist.github.com/laserson/5341707/raw/27e550fe15e8217f1e94ebb2c9581011dfa09aff/gistfile1.txt

Can you please explain how you fixed this error, and what is the root cause? Appreciate your reply here or to my email address. Thanks again and look forward to your reply!

Hi James, in the actual call to randomForest, set do.trace=FALSE. The newer version of RHadoop deals a bit differently with writing to stdout/stderr, so having do.trace=TRUE confuses the Streaming framework. (Thanks again to Antonio Piccolboni for this solution.) There is a potential problem in which the reduce tasks may time out, but it worked fine for me. I will change the code shortly to reflect this issue.

Pingback: Google Summer of Code 2013 – Project Proposal YuFu | All About Features

Hi Uri,

Thanks, interesting post. I am not very familiar with RHadoop, but do I get it right that each tree in the forest is built in a single reduce task? I assume then K is limited by the memory available for the given reduce task and it strongly depends on RHadoop’s memory handling and of course the number of attributes. How do you think this limited K affects accuracy? Or do K=40000 produce trees that are already good enough? Have you made any accuracy comparisons? Thanks in advance!

Zoltan, you may be interested in a paper by Jordan et al on the Bag of Little Bootstraps http://arxiv.org/abs/1112.5016

Hi when I run the following command:

forest <- do.call(combine, raw.forests)

I run into this error :

"Error in rf$votes + ifelse(is.na(rflist[[i]]$votes), 0, rflist[[i]]$votes) : non-conformable arrays

In addition: Warning message:

In rf$oob.times + rflist[[i]]$oob.times :

longer object length is not a multiple of shorter object length"

any solution to this?

Submitted 3 issues in github

a) reducer task stuck at 67%

https://github.com/cloudera/poisson_sampling/issues/3

b) running fitRandomForrest with small input data sample results in the exception (M/R terminates)

https://github.com/cloudera/poisson_sampling/issues/4

c) Not an issue per say as the M/R succeeds but with much reduced input training data sethttps://github.com/cloudera/poisson_sampling/issues/5

Last issue is really a question, even though M/R job succeeds, it pertains to scalability of the solution/framework (M/R with R executing random forest fitness algorithm)

copying just last “issue” for reference, this looks like a great way to address sampling but its also uncertain as to stability of the solution…

inally, setting number of input rows to 1000, in joinData.R

transactions <- read.table(file="../downloads/train.csv",

nrows=1000,

#nrows=20,

…

allows fitRandomForrest.R M/R job to succeed (randomForrest trees are created)

hence it might be question of resources, or limits in rmr2 lib itself

from the stability point of view, would like to find out why the small # of samples cause R M/R to terminate with an exception

and also, what if sample input data set is very large much larger than provided in the example, what are operational boundaries of this framework

Thanks and appreciate and feedback/advice!

Want to also note that seeing one reduce task being created, tried setting D=”mapred.reduce.tasks=10″ in the mapred function

however this caused only 2 reduce tasks to be created (2 R processes) this is still way too small of the number, it won’t scale…

24497 mapred 20 0 667m 481m 4284 R 97.8 12.6 4:58.21 R

24492 mapred 20 0 667m 481m 4284 R 97.1 12.6 4:43.82 R

this is hence looking less and less as rmr2 but a combination of theoretical underpinnings of random forest / input data entropy/structure

the issues above has been resolved (fundamental error in the mapred configuration)

https://github.com/cloudera/poisson_sampling/issues/6#issuecomment-25478103

I am also getting the error mentioned by Anuja above and notice that it has not been answered yet. I am getting below error while running :

> forest <- do.call(combine, raw.forests)

Error in rf$votes + ifelse(is.na(rflist[[i]]$votes), 0, rflist[[i]]$votes) :

non-conformable arrays

In addition: Warning message:

In rf$oob.times + rflist[[i]]$oob.times :

longer object length is not a multiple of shorter object length