**Learn how Spark facilitates the calculation of computationally-intensive statistics such as VaR via the Monte Carlo method.**

Under reasonable circumstances, how much money can you expect to lose? The financial statistic *value at risk* (VaR) seeks to answer this question. Since its development on Wall Street soon after the stock market crash of 1987, VaR has been widely adopted across the financial services industry. Some organizations report the statistic to satisfy regulations, some use it to better understand the risk characteristics of large portfolios, and others compute it before executing trades to help make informed and immediate decisions.

For reasons that we will delve into later, reaching an accurate estimate of VaR can be a computationally expensive process. The most advanced approaches involve Monte Carlo simulations, a class of algorithms that seek to compute quantities through repeated random sampling. When we can’t derive a closed form for a probability distribution analytically, we can often estimate the shape it takes by repeatedly sampling the simpler processes that compose it and seeing how the results perform in aggregate.

Several Cloudera customers in the financial services industry have independently approached us about calculating VaR as well as doing more general analysis of financial risk on our platform. Working on engagements with a few different institutions, we’ve had the chance to codify some best practices for analyzing risk in the Apache Hadoop ecosystem. In particular, we’ve helped stand up these calculations on Apache Spark, the general-purpose distributed computation framework with support for in-memory analytics that is being rapidly adopted across multiple industries. Some customers have come across Spark on their own, while we’ve recommended it to others and found it to be a good fit.

In this post you will learn the background on VaR and approaches to calculating it, and get a reference implementation for using a Monte Carlo method that leverages Spark to run many simulations in parallel. The post focuses on simplicity over the best possible model, but describe the points where steps can be extended with more complex techniques or domain knowledge.

Example code along with a small sample dataset is available here.

## Background on VaR

VaR is a simple measure of an investment’s risk that tries to provide a reasonable estimate of maximum probable loss over a particular time period. A VaR calculation considers a given confidence level: a VaR of US$1 million with a 95% confidence level means that we believe our investment stands only a 5% chance of losing more than US$1 million over the time period.

A few different methods are employed to calculate VaR:

**Variance-covariance:**The simplest and least computationally expensive approach, the variance-covariance method derives a solution analytically by relying on simplifying assumptions about the probability distributions in the model.**Historical simulation:**Historical simulation tries to directly extrapolate risk from historical data. A drawback of this method is that historical data can be limited and fails to include “what-ifs”. The history we have for the instruments in our portfolio may lack market collapses, but we might wish to model what happens to our portfolio in these situations. Techniques exist for making historical simulations robust to these issues, such as introducing “shocks” into the data, but we won’t cover them in this post.**Monte Carlo simulation:**This method, covered in this post, tries to avoid some of the assumptions in the methods described above. In its most general form, this method:- Defines a relationship between market conditions and each instrument’s returns
- Poses “trials” consisting of random market conditions
- Calculates the portfolio loss for each trial, and uses the aggregated trial data to build up a profile of the portfolio’s risk characteristics.

It is of course worth mentioning that the Monte Carlo method isn’t perfect. The models for generating trial conditions and for inferring instrument performance from them must make simplifying assumptions, and the distribution that comes out won’t be more accurate than these models going in.

## The Model

A Monte Carlo risk model typically phrases each instrument’s return in terms of a set of market factors (such as the value of indexes like the S&P 500, the US GDP, or currency exchange rates). In our simulation, we’ll use a simple linear model: instrument returns are calculated as a weighted sum of the values of the market factors. We choose different weights for each instrument for each market factor. We can fit the model for each instrument with a regression using historical data. On top of this, we’ll allow our instruments to have some optionality – each can be parameterized with a minimum and maximum value. This adds an element of non-linearity that the variance-covariance method has trouble handling, but which the Monte Carlo method can take in stride.

It’s also worth mentioning that we could have chosen a more complicated model, perhaps incorporating domain specific knowledge. While the per-instrument model-fitting step of the computation is a good fit for Spark as well, we’ll leave it out here for the sake of brevity.

Now that we have our model for calculating instrument losses from market factors, we need a process for simulating the behavior of market factors. A simple assumption is that each market factor follows a normal distribution. To capture the fact that market factors are often correlated – when NASDAQ is down, the Dow is likely to be suffering as well – we can use a multivariate normal distribution with a non-diagonal covariance matrix. As above, we could have chosen a more complicated method of simulating the market or assumed a different distribution for each market factor, perhaps one with a fatter tail.

To summarize, trial conditions are drawn from a multivariate normal distribution:

The value of a particular instrument in a particular trial is the dot product of the trial conditions and the instrument’s factor weights*w _{i}* bounded by the instrument’s minimum and maximum value,

*n*and

_{i}*x*:

_{i}The portfolio’s value for the trial is the sum of all instrument values for that trial:

## Running on Spark

A drawback of the Monte Carlo method is its computational intensity; getting accurate results for a large portfolio can require many trials, and simulating each trial can be computationally involved. This is where Spark comes in.

Spark allows you to express parallel computations across many machines using simple operators. A Spark job consists of a set of transformations on parallel collections. We simply pass in Scala (or Java or Python) functions and Spark handles distributing the computation across the cluster. It is also fault tolerant, so if any machines or processes fail while the computation is running, we don’t need to restart from scratch.

A general sketch of our computation looks like:

- Broadcast our instrument data to every node on the cluster. While a large portfolio might consist of millions of instruments, the most memory this should take is in the 10s of gigabytes, which is easily enough to fit into main memory on modern machines.
- Create a parallel collection (RDD) of seeds for our random number generators.
- Create a new parallel collection of portfolio values under random trial conditions by applying a function to each seed that generates a set of random trial conditions, applies them to each instrument to calculate the its value under those conditions, and then sums over all instruments.
- Find the boundary between the bottom 5% of trial values and the rest.
- Subtract the portfolio at this boundary from the current value to find the value at risk.

The following is Scala code for calculating the trial values. The `trialValues`

function takes all our instruments and a number of trials to run, and spits out an array containing the portfolio values for each trial.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
def trialValues(seed: Long, numTrials: Int, instruments: Seq[Instrument], factorMeans: Array[Double], factorCovariances: Array[Array[Double]]): Seq[Double] = { val rand = new MersenneTwister(seed) val multivariateNormal = new MultivariateNormalDistribution(rand, factorMeans, factorCovariances) val trialValues = new Array[Double](numTrials) for (i <- 0 until numTrials) { val trial = multivariateNormal.sample() trialValues(i) = trialValue(trial, instruments) } trialValues } def trialValue(trial: Array[Double], instruments: Seq[Instrument]): Double = { var totalValue = 0.0 for (instrument <- instruments) { totalValue += instrumentTrialValue(instrument, trial) } totalValue } def instrumentTrialValue(instrument: Instrument, trial: Array[Double]): Double = { var instrumentTrialValue = 0.0 var i = 0 while (i < trial.length) { instrumentTrialValue += trial(i) * instrument.factorWeights(i) i += 1 } Math.min(Math.max(instrumentTrialValue, instrument.minValue), instrument.maxValue) } |

Note that we could have written this code in Java or Python as well. The Spark code that runs it on a cluster is:

1 2 3 4 5 6 |
val broadcastInstruments = sc.broadcast(instruments) val seeds = (seed until seed + parallelism) val seedRdd = sc.parallelize(seeds, parallelism) val trialsRdd = seedRdd.flatMap(trialValues(_, numTrials / parallelism, broadcastInstruments.value, factorMeans, factorCovariances)) |

We now have a collection of simulated losses over many trials. To calculate VaR, we want to see what happens at the bottom 5%. We can compute this with:

1 |
val varFivePercent = trialsRdd.takeOrdered(numTrials / 20).last |

Of course, we don’t have to stop at calculating VaR — our simulation contains much more information about the risk characteristics of the portfolio. For example, we can home in on the trials for which our portfolio performs the worst and determine which instruments and market factors are the most to blame. Spark is also useful for computing these rollups and aggregations. We can also use kernel-density estimation to visualize the probability distribution our simulated portfolio values take; the sample repository contains a simple implementation that will soon go into Spark’s built-in machine learning library, MLLib, itself.

Using our toy data with four instruments, three factors, and a million trials, we observe the following risk profile (chart generated by pulling the density results into a spreadsheet):

## Other Considerations

##### Interactive Analysis

So far we’ve presented the computation as a batch job; however, Spark also supports interactive settings. For example, analysts and traders might wish to see what happens when they tweak model parameters, filter the set of instruments considered to those matching some particular criteria, or add a trade that they’re about to execute into the mix. After broadcasting, Spark will keep the set of instruments in memory on every machine in the cluster, making them available for servicing interactive queries. If filtering on particular instrument attributes is routinely done, it might make sense to store the instruments as a map indexed by those attributes.

##### Huge Instrument Data

While it’s rare for a single portfolio to be too large to fit in entirety on every machine, working with huge portfolios composed of instruments like small business loans might require splitting up the portfolio data across machines.

Spark handles this use case as well; here’s a general sketch of how it looks:

1 2 3 4 5 6 7 8 9 10 11 |
val sc = new SparkContext(...) val instrumentsRdd = sc.parallelize(instruments, numPartitions) val fragmentedTrialsRdd = instrumentsRdd.mapPartitions(trialReturns(seed, _, numTrials, modelParameters)) val trialsRdd = fragmentedTrialsRdd.sumByKey() def trialReturns(seed: Long, instruments: Iterator[Instrument], numTrials: Int, modelParameters: YourModelParameters): Iterator[(Int, Double)] = { // Compute the value of the given subset of instruments for all trials, // and emit tuples of (trialId, value) } |

Essentially we invert the parallelism so that each task computes the value of a subset of instruments under every trial, instead of the value of every instrument under a subset of trials.

## Other Distributed Processing Frameworks

Thus far we’ve hyped Spark for distributed processing, without comparing it to other distributed processing frameworks. (For example, why not use MPI and take advantage of the speed of C++?) The primary reason for preferring Spark is the balance it strikes between flexibility and ease of use. Its programming model allows describing parallel computations with far fewer lines of code, and its Scala and Python REPLs allow experimenting with models interactively. Spark combines the potential for rapid prototyping with the strong performance characteristics of the JVM that matter when the time comes to put a model into production. Furthermore, deeper analyses often involve operations on the full instrument-trial loss matrix, which, on decently sized portfolios, can run into the terabytes. A fault-tolerant, data-parallel framework like Spark can handle data at this volume.

That said, only highly optimized native code can squeeze the last bit of juice out of each CPU. Most numerical computations boil down to matrix and vector operations. The bulk of the computation from the model in this post lies in computing dot products — using Java and Python frameworks like JBlas and numpy pushes these linear-algebra operations down to optimized Fortran. Existing C++ libraries can be accessed through JNI.

I’ve elided these optimizations here for the sake of simplicity, but they’re worth considering when building a production application.

## Conclusion

In this post, we’ve demonstrated what it looks like to use Spark for Monte Carlo simulations in financial applications. VaR is the first step at answering the larger question: What is the shape of the probability distribution that governs my investments’ returns, how sensitive are my returns, and to what market factors are they most sensitive? Spark provides a flexible framework for leveraging the processing power of compute clusters to answer these questions.

*Sandy Ryza is a data scientist at Cloudera, a Hadoop committer, and a Spark contributor.*

Definitely looks like the first code sample is cuff off.

Thanks for reporting, this bug was fixed over the weekend.

Hi,

Is the code available so, we can replicate what you’ve done and change it to see how it will perform?

It will be a big help!

Thanks

Hi,

Maybe VaR would be fine on Spark, but I think there is a line where the problem is fundamentally ‘compute grid’ (i.e. data synapse type of soln. rather than ‘data grid’) as soon as you look at valuations themselves then you are definitely in the ‘compute grid’ mode. At least at present!

“The primary reason for preferring Spark is the balance it strikes between flexibility and ease of use. ” Not if you are doing valuations, Spark wouldn’t be chosen as a platform for that at all.

The problem is i) performance of Scala (no where near a C++/GPU) – memory usage and performance isn’t great when sticking to a strictly functional approach. And ii) the amount of memory Spark consumes is quite high at present.

@Michaell For extreme use cases, specialized software and hardware could make sense. I’d challenge your either/or comparison to GPU compute speed, since in fact you can use a GPU from a Spark (or any Java) program. Tight loops need not be written FP in Scala, and JIT to native code anyway. The commodity / general-purpose compute, even with its overheads, could still be the more sensible choice in many contexts.

In the last code snippet, I think it is ‘instrumentsRdd.mapPartitions(trialReturns …’ rather than ‘instrumentsRdd.mapPartitions(trialValues … ‘

I searched the MLlib library in Spark v1.3.1. I couldn’t find this Monte Carlo algorithm :(… Did you added to MLlib?

Melroy,

The algorithm is a little too domain-specific to merit inclusion in MLLib. However, the spark-timeseries library (https://github.com/cloudera/spark-timeseries) contains some of the functionality.

-Sandy

Example from “Adv. Analytics with Spark”, Chapter 9 fails with:

( Running the trials part )

val trials = seedRdd.flatMap(trialReturns(_, numTrials / parallelism, bFactorWeights.value, factorMeans, factorCov))

org.apache.spark.SparkException: Task not serializable

Ranko,

Please raise this issue at the source:

https://github.com/sryza/aas/issues