**Thanks to former Cloudera intern Jose Cambronero for the post below about his summer project, which involved contributions to MLlib in Apache Spark.**

Data can come in many shapes and forms, and can be described in many ways. Statistics like the mean and standard deviation of a sample provide descriptions of some of its important qualities. Less commonly used statistics such as skewness and kurtosis provide additional perspective into the data’s profile.

However, sometimes we can provide a much neater description for data by stating that a sample comes from a given distribution, which not only tells us things like the average value that we should expect, but effectively gives us the data’s “recipe” so that we can compute all sorts of useful information from it. As part of my summer internship at Cloudera, I added implementations to Apache Spark’s MLlib library of various statistical tests that can help us draw conclusions regarding how well a distribution fits data. Specifically, the implementations pertain to the Spark JIRAs SPARK-8598 and SPARK-8884.

In this post, I’ll offer an overview of the first two tests and take the 1-sample variant out for a spin on some simulated data.

## Testing in the Big Data World

The world of small-data analytics has many tools to accomplish this, ranging from quantitative measures to more graphical approaches. R, a popular statistical programming language, provides many of the usual measures out-of-the box, and there are a plethora of packages that provide additional goodness-of-fit tests and visualization tools to aid data scientists in determining whether a sample comes from a given distribution (or isn’t different enough to warrant ruling that out). The same is true for Python using libraries such as SciPy.

However, these tools are rarely able to natively handle the volume of data associated with big-data problems. Unless users want to go with the alternative of testing subsamples of their data, they have to turn to platforms designed for such big data tasks.

Apache Spark’s MLlib can help scratch this itch by providing implementations of various popular statistical measures that can handle large-scale data. For example, the Statistics object in MLlib provides various implementations of the Chi-Square test, both as a test for independence and as a goodness-of-fit test. If you were to collect values and wanted to test these against expected values from a given discrete distribution, in order to draw conclusions about whether the data stems from that distribution, you could use the `ChiSqrTest(Vector, Vector)`

implementation.

As useful and powerful as the Chi-Squared test is with discrete data, applying it to continuous data is inappropriate. Thus there has been a need for continuous distribution goodness-of-fit testing for big-data.

## Null Until Proven Alternate

We might have various distributions that we think fit our data. We now have the task of testing whether our empirical data actually follow these theoretical distributions.

It is often difficult to conclude that a given hypothesis is correct. However, the converse, concluding that a given hypothesis is wrong, can be simpler. This idea might initially seem a bit counterintuitive, but a simple example can clear things up. Let’s say a bad back is a common (clearly not professional!) diagnosis for back pain. If I have back pain, it is possible that I might have a bad back. It is also possible that I simply sat for too long. However, if I don’t have back pain, it is fairly certain that I don’t have a bad back. This drives the underlying intuition behind the concept of a null hypothesis and statistical tests for that hypothesis. In our example, “having a bad back” is our null hypothesis: what we assume until we can prove otherwise, while an analysis of our level of back pain can be seen as a our statistical test, with “no pain” providing strong evidence against our null hypothesis.

We can use this approach to do something similar with our potential distributions. Next, I’ll introduce one such standard statistical test in this blog post: the Kolmogorov-Smirnov test. While “passing” the test doesn’t guarantee that the data comes from that distribution, failing can demonstrate that it does not.

## Statistics: Goodness-of-Fit tests

Kolmogorov-Smirnov might be one of the most popular goodness-of-fit tests for continuous data out there, with an implementation in pretty much every popular analytics platform. The procedure behind the test centers around comparing the largest deviation between the cumulative distribution at a given value X under the theoretical distribution and the empirical cumulative distribution. Empirical cumulative distribution is simply fancy terminology for the realized distribution of our data. Meanwhile, we should view the theoretical cumulative distribution as what we would expect, if our data followed said distribution.

The Kolmogorov-Smirnov statistic for the 1-sample, 2-sided statistic is defined as

where *N* is the size of the sample, *Z* is the sorted sample, and Φ represents the cumulative distribution function for the theoretical distribution that we want to test. So notation aside, we’re doing exactly what we discussed prior: finding the largest difference between realized and expected distribution values.

This general idea is captured by the graphic below. The test tries to capture the largest difference between these two curves. Then, given the distribution of the statistic itself, we can make some claims regarding how likely such a distance would be assuming that the null hypothesis (i.e. that the data comes from the distribution) holds.

One of the main appeals of this test centers around the fact that the test is distribution-agnostic. By that we mean that the statistic can be calculated and compared the same way regardless of the theoretical distribution we are interested in comparing against (or the actual distributions of the one or two samples, depending on the test variant being performed).

## Implementations

##### Distilling distributed distribution (statistics)

Calculating both of these statistics is extremely straightforward when performed in memory. If we have all our data, we can simply traverse it as needed and calculate what we need at each point. Performing the same calculations in a distributed setting can take a bit more craftiness.

The remainder of this blog focuses on explaining implementation details, along with an example for the 1-sample Kolmogorov-Smirnov test.

##### Locals and Globals

The general intuition behind the strategy applied to solve these problems is: we can often arrive at a global value by working with local values. For example, consider the problem of finding the global maximum in the dot plot below. We could keep track of the last seen maximum as we traverse the plot from left to right. But, if the plot were divided into parts, we could just as easily take the maximum of each part (local maximum, and potentially the global maximum) and then compare at the end and pick the winner.

##### 2-sample Kolmogorov-Smirnov test

The 2-sample variant of the Kolmogorov-Smirnov test allows us to test the hypothesis that both samples stem from the same distribution. Its definition is a nice, straightforward extension of the 1-sample variant: we are looking for the largest distance between the two empirical CDF curves. The process for generating the two curves is quite simple. We create a co-sorted group *Z*, and then for each element *Z _{i}* we calculate two values,

*y*and

_{i1}*y*, which are simply the number of elements in each respective sample with a value less than or equal to

_{i2}*Z*, divided by the total number of elements in that sample. This in effect provides us with two empirical CDF curves for

_{i}*Z*assuming we view sample 1 and sample 2 as some sort of “step-wise CDFs”. We can then subtract these two values to compare the distance between the two empirical CDFs at that given point.

Implementing this for a single machine is very easy. The paragraph above effectively gives you the steps. But now let’s think about calculating this in a distributed fashion. In general, when conceiving distributed algorithms, the goal is to minimize the required communication between nodes. In the context of Spark this usually means minimizing the number of operations that induce shuffles. In this case, the algorithm must require at least a single shuffle – there is no getting around the fact that we must union and sort the data, but need we do more?

If we compute the empirical CDFs under each sample within a partition, the values for sample 1 will be off by \(\alpha\) and the values for sample 2 will be off by *β*. However, note that all values for each sample within that partition will be off by that same amount. Since both are off by constants, the vertical distance between them is also off by a constant (*α – β*). The graph below shows this idea for the case of one sample. The two sample case is analogous.

This fact allows us to simply concentrate on finding the largest deviation within each partition, adjusting these later by the appropriate constant *α* is really *α _{i}* for each partition

*i*, similarly for

*β*once we’ve collected the values on the driver, and then compare the results for the maximum distance.

So in the case of the two sample KS test we want each partition to compute:

- The maximum and minimum distance between the local empirical CDFs
- The count of elements in sample 1
- The count of elements in sample 2

We actually combine the last two points into one adjustment constant, to minimize the number of elements we need to keep track of. The diagram below visualizes the steps described prior.

##### Running the Implementation on Simulated OLS Residuals

In this case, we’ve decided to simulate data, from which we then create two very simplistic linear regression models using ordinary least squares (OLS).

If you recall from your statistics classes, OLS imposes various assumptions on the data that you’re trying to model and on the resulting model. To the extent that the assumptions don’t hold, you can end up with poor results, and in some cases you might not even know they’re necessarily poor. One of those restrictions is that the residuals (which are defined as the observed value minus the predicted value) should be *white noise*, which means they follow a normal distribution, are uncorrelated, and have constant variance. Under some circumstances, you can keep your OLS model, even if not all boxes are checked off, but in this case we’ll assume we’re being sticklers.

We’ll begin by generating our data: two regressors, which are combined to create a dependent variable. We can very easily generate a distributed sample from a normal distribution by using Spark’s `RandomRDD`

, which is part of `MLlib`

.

1 2 3 4 5 6 |
val n = 1000000 val x1 = normalRDD(sc, n, seed = 10L) val x2 = normalRDD(sc, n, seed = 5L) val X = x1.zip(x2) val y = X.map { case (xi1, xi2) => 2.5 + 3.4 * xi1 + 1.5 * xi2 } val data = y.zip(X).map { case (yi, (xi1, xi2)) => (yi, xi1, xi2) } |

Once we’ve defined our data, we specify two models. One which is clearly wrong. In turn, this means that the residuals for the incorrect model should not satisfy the white-noise assumption.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// This model specification captures all the regressors that underly // the true process def goodSpec(obs: (Double, Double, Double)) = { val X = Array(1.0, obs._2, obs._3) val y = obs._1 LabeledPoint(y, Vectors.dense(X)) } // this is a bad model def badSpec(obs: (Double, Double, Double)) = { val X = Array(1.0, obs._2, obs._2 * obs._3, obs._3) val y = obs._1 LabeledPoint(y, Vectors.dense(X)) } |

We specified the models as simple functions, since we can then map over the observations RDD and obtain an `RDD[LabeledPoint]`

, which is exactly what MLlib’s linear regression implementation expects.

1 2 3 4 5 6 |
val iterations = 100 val goodModelData = data.map(goodSpec) val goodModel = LinearRegressionWithSGD.train(goodModelData, iterations) val badModelData = data.map(badSpec) val badModel = LinearRegressionWithSGD.train(badModelData, iterations) |

Now all we need are our residuals. We can obtain these easily using the models’ `predict`

function.

1 2 |
val goodResiduals = goodModelData.map(p => p.label - goodModel.predict(p.features)) val badResiduals = badModelData.map(p => p.label - badModel.predict(p.features)) |

In a small-data environment, one of the common ways to start testing the white-noise assumption is to perform various plots of the residuals and see if there are any glaring departures from normality. However, in a big-data world, such tools might not be feasible or necessarily all that useful. We can, however, leverage other measures, such as the 1-sample Kolmogorov-Smirnov statistic. (Note that the 2-sample version of the test is not yet available in Spark 1.5, and will likely be part of a later version.) The function below performs this test for us.

We would like to compare our residuals against the standard normal distribution. To this end, we standardized our residuals first. Alternatively, note that we could have tested the original residuals and simply have provided the distribution parameters (mean and standard deviation) as parameters to our function call.

1 2 3 4 5 6 7 |
def testResiduals(residuals: RDD[Double]): KolmogorovSmirnovTestResult = { val mean = residuals.mean() val sd = residuals.stdev() // standardize using distribution parameters before testing val standardizedResiduals = residuals.map(x => (x - mean) / sd) Statistics.kolmogorovSmirnovTest(standardizedResiduals, "norm") } |

Once we check the test results, we can see that the badly specified model has residuals that do not satisfy the white noise assumption.

1 2 |
println(testResiduals(goodResiduals)) println(testResiduals(badResiduals)) |

Note that the test is meant for continuous distributions, so any ties in ranking will affect the test’s power.

## Conclusion

To see the full code for this example, please visit this post’s github repo.

As usual, all feedback is welcome. Happy hypothesis-testing!

*Jose Cambronero is a student in New York University’s MS in CS program at Courant Institute of Mathematical Sciences.* *His main interests include programming languages (with a strong incline towards functional programming), compiler construction, and natural-language processing.*

very well written. I was just looking into this

I see you have a pull request waiting since august. any idea when they will put it in?

I would prefer to use your code instead of writing one my self :)

Again, thanks for this nice post

Is there an error in the current implementation of kolmogorovSmirnovTest?

See: http://stackoverflow.com/questions/37335408/kolmogorov-smirnov-test-in-spark-python-not-working