# Bayesian Machine Learning on Apache Spark

- by Mert Terzihan
- August 27, 2014
- no comments

**Markov Chain Monte Carlo methods are another example of useful statistical computation for Big Data that is capably enabled by Apache Spark.**

During my internship at Cloudera, I have been working on integrating PyMC with Apache Spark. PyMC is an open source Python package that allows users to easily apply Bayesian machine learning methods to their data, while Spark is a new, general framework for distributed computing on Hadoop. Together, they provide a scalable framework for scalable Markov Chain Monte Carlo (MCMC) methods. In this blog post, I am going to describe my work on distributing large-scale graphical models and MCMC computation.

## Markov Chain Monte Carlo Methods

MCMC methods are a set of widely-used algorithms in Bayesian inference. These methods mainly aim to approximate an intractable posterior function by random sampling, and their applications can be found in fields like physics, system simulation, econometrics, and machine learning. Andrieu et al. [2] describe how MCMC is useful for computing integrals or optimizing functions in large-dimensional spaces, which in the context of machine learning translates into parameter estimation, prediction, and model selection. As the community is transitioning from processing medium-scale data to Big Data, being able to analyze large dimensional data is becoming more significant than ever.

There are ongoing efforts in academia to implement MCMC methods in distributed and parallel settings. During my internship at Cloudera, I tried to adapt the PyMC framework to Spark in order to run multiple MCMC chains on distributed data, while maintaining PyMC’s convenient abstractions for computing on probabilistic graphical models (PGMs).

In lieu of covering the theoretical background of MCMC methods, below are a few useful resources for interested readers:

- Bayesian Reasoning and Machine Learning by David Barber has a chapter on Approximate Sampling
- Christophe Andrieu et al. have written an introductory tutorial (pdf) on MCMC methods that covers most of the MCMC algorithms
- Dr. Daphne Koller offers an online course on Coursera, Probabilistic Graphical Models, which also covers the Gibbs Sampler and the Metropolis-Hastings Algorithm
- Dr. A. Taylan Cemgil has prepared very useful lecture notes (pdf) for his Monte Carlo methods course

## PyMC

PyMC is a widely used Python package that provides a library of tools for defining Bayesian statistical models and applying approximate inference techniques on them. It has built a well designed abstraction for defining statistical models and analyzing the sampled data after the sampling process.

To introduce the PyMC user API, we will follow the great introductory example provided in PyMC’s User’s Guide: analyzing the coal mining dataset [1]. The dataset contains the number of coal-mining disasters in England from 1851 to 1962. What we would like to find out from this dataset is the year where a dramatic change had occurred in the number of deaths caused by these disasters.

This dataset is composed of the number of events (deaths from coal mine disasters) that occur each year in the range. It is natural to model the probability distribution of the number of events with a Poisson distribution. The switchpoint, which is assumed to be distributed uniformly along the year domain, divides the data into two regions where each regions uses a different mean for its Poisson distribution. Furthermore, it is assumed that both of the means follow an Exponential distribution.

Defining the parameters of the model is easy and straightforward (the original code is in disaster_model.py):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
from pymc import * from numpy import array, empty from numpy.random import randint disasters_array = array([4, 5, 4, 0, 1, 4, 3, 4, 0, 6, 3, 3, 4, 0, 2, 6, 3, 3, 5, 4, 5, 3, 1, 4, 4, 1, 5, 5, 3, 4, 2, 5, 2, 2, 3, 4, 2, 1, 3, 2, 2, 1, 1, 1, 1, 3, 0, 0, 1, 0, 1, 1, 0, 0, 3, 1, 0, 3, 2, 2, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 2, 1, 0, 0, 0, 1, 1, 0, 2, 3, 3, 1, 1, 2, 1, 1, 1, 1, 2, 4, 2, 0, 0, 1, 4, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1]) switchpoint = DiscreteUniform( 'switchpoint', lower=0, upper=110, doc='Switchpoint[year]') early_mean = Exponential('early_mean', beta=1.) late_mean = Exponential('late_mean', beta=1.) @deterministic(plot=False) def rate(s=switchpoint, e=early_mean, l=late_mean): ''' Concatenate Poisson means ''' out = empty(len(disasters_array)) out[:s] = e out[s:] = l return out disasters = Poisson('disasters', mu=rate, value=disasters_array, observed=True) |

The model defines four stochastic variables

`switchpoint`

: the year that the change occurred; Uniform distribution`early_mean`

: the average deaths per year before the change; Exponential distribution`late_mean`

: the average deaths per year after the change; Exponential distribution`disasters`

: this is the variable that generates the disasters data conditioned on`switchpoint`

,`early_mean`

, and`late_mean`

; Poisson distribution

and a single deterministic variable, rate, which takes on the value of the early or late rate value depending on the `switchpoint`

value.

After defining the model, we can learn the posterior distribution for `switchpoint`

by sampling from the model:

1 2 |
m = MCMC([disasters, rate, switchpoint, early_mean, late_mean]) m.sample(500, burn=100) |

The Python code above performs the inference by using an MCMC algorithm to draw 500 samples from our model (where the first 100 samples are ignored as a “burn-in period”). We can view the samples like so:

1 |
m.trace('switchpoint')[:] |

Although PyMC holds the sampled values in memory by default, it also provides other “backend” options: “txt”, “pickle”, “sqlite” and “hdf5″. For instance, one can save the sampled data to a text file:

1 2 3 |
m = MCMC([disasters, rate, switchpoint, early_mean, late_mean], db='txt', dbname='path/to/text_file') m.sample(500, burn=100) |

The best way to dive deeper into PyMC is to read the package’s User’s Guide. Additionally, the examples folder in the source code contains very useful introductory code to start with.

## Apache Spark

Apache Spark is a popular distributed computing framework that was started as a research project at UC Berkeley’s AMPLab [6]. The most prominent difference between Spark and the MapReduce paradigm is that Spark can keep the data in distributed memory. Because MCMC methods consist of iterative computations, Spark would be the most suitable platform for running MCMC jobs in parallel.

For further information, Sean Owen’s blog post explains why Spark is a great platform for data scientists.

## PyMC on Spark

By its design, PyMC runs on a single computer, which limits the size of the data that can be analyzed. This summer, I’ve been working on integrating PyMC and Spark, and providing users a PyMC-based abstraction to run their MCMC algorithms distributed on Spark. The development version of this project can be found under the public repository pymc.

In order to accomplish the above goal, I started with implementing another backend option, HDFS, to enable users to save their traces to HDFS as txt files. The only dependency for the HDFS backend is PyWebHDFS, which can be installed via pip.

If we follow the disaster model example, we can save the results to HDFS as follows:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import pymc as pm # an alternative way to load the same model: from pymc.examples import disaster_model # The HDFS path to save the traces (without the leading '/') dbname = 'user/test/results' # HDFS Configuration host = 'localhost' port = '50070' user_name = 'test' # Create and save data to HDFS M = pm.MCMC(disaster_model, db='hdfs', dbname=dbname, host=host, port=port, user_name=user_name) M.sample(100) M.db.close() # Load data from HDFS db = pm.database.hdfs.load(dirname=dbname, host=host, port=port, user_name=user_name) print db.trace('early_mean')[:] |

In the above example, computation is done on a local machine and the data is sent over the network to HDFS for reads and writes. Instead, we would like to achieve distributed computation for big models that can’t fit into or would be slow to run on a single machine (MCMC is a collection of methods which are more CPU bound). Therefore, I have implemented `DistributedMCMC`

, a class which fits distributed models, along with `distributed_spark`

, a backend class for storing the samples (analogous to other PyMC backends).

The `DistributedMCMC`

class works by running MCMC chains locally on partitions of data with periodic global synchronizations. It can be instantiated with custom

- PyMC models
- Step methods
- Data-preprocessing functions to avoid recomputing derived data on each iteration
- Factory functions that create broadcasted global parameters
- Storage functions for serializing sampled data to HDFS rather than storing them in memory (vital for large sampled data)

The purpose of the global parameter is to enable the executors to remain synchronized across nodes and to ensure that the samples are being drawn from the true posterior. The number of local steps to take before every synchronization is also parameterized as `local_iter`

. Since the sampling procedure is paused to synchronize the distributed models, a global update is an expensive operation. Therefore, the user should set the `local_ite`

r as high as their model allows, though most of the distributed models require frequent synchronizations.

To illustrate the distributed model better, let’s review an example.

## Topic Modeling with MCMC

Given a set of documents and number of topics, topic modeling is a collection of statistical modeling algorithms that clusters the documents according to their topics. Examples of such models are probabilistic latent semantic indexing, non-negative matrix factorization, latent Dirichlet allocation (LDA), and hierarchical Dirichlet processes. The idea behind topic modeling is to consider topics as a probability distribution over words and documents as a mixture of topics. In this section, I will dive into the LDA model I implemented with PyMC and Spark. Feel free to skip ahead to the next section.

##### Latent Dirichlet Allocation

LDA was first proposed in a paper by David M. Blei, Andrew Y. Ng, and Michael I. Jordan in 2003 [3]. As defined in the paper, the terms are:

- φ
_{t,w}is the probability of topic t generating word w - θ
_{d,t}is the probability of sampling topic t for document d - x
_{d,i}is the ith word in the d^{th}document - z
_{d,i}is the topic assignment for the i^{th}word in the dM^{th}document

LDA is a generative model which assumes the following generative process:

- φ
_{t}∝ Dirichlet(β) - θ
_{d}∝ Dirichlet(α) - z
_{d,i}∝ Categorical(θ_{d}) - x
_{d,i}∝ Categorical(φ_{z(d,i)})

Since LDA is a generative model, one can model a corpus of documents as well as generate some documents given the parameters.

For a more detailed interpretation of LDA, Probabilistic Topic Models by Mark Steyvers and Tom Griffiths is a very informative article.

##### Collapsed Gibbs Sampling

We can apply any MCMC sampling algorithm to a corpus of documents to extract the hidden topics, however a typical LDA model contains large numbers of nodes and this might affect the convergence rate. In order to prevent slow convergence, Steyvers and Griffiths [4] have proposed a Collapsed Gibbs Sampler for LDA, where the latent variables φ and θ are integrated out and the topic for each word in the corpus is sampled conditioned on all other variables. Therefore, instead of regular Metropolis-Hastings method, I have implemented the collapsed Gibbs sampler for the LDA model. Griffiths and Steyvers have showed [3] the conditional probability of sampling topic t as z_{d,j} given all other variables is:

where

is a matrix of word-topic counts excluding the current instance, and on the other hand,

is a matrix of document-topic counts not including the current topic assignment.

##### Approximate Distributed LDA

The approximate distributed LDA (AD-LDA) model was first proposed by Newman et al. [5] and takes advantage of the weak dependency between topic assignments to different words to distribute the Gibbs sampling updates. After partitioning the corpus across the executors, the algorithm is composed of two parts:

- Sample topics locally on each executor using the collapsed Gibbs sampler
- Synchronize the word-topic counts

We don’t synchronize document-topic counts because each document is local to a single executor. On the other hand, the same vocabulary is being used by all executors, so they need to synchronize the word-topic counts after a specified number of iterations of the local Gibbs sampler. The synchronization is being performed by combining word-topic counts in a reducer (as illustrated in [5]), followed by broadcasting the updated word-topic counts to the executors using the SparkContext.broadcast() method.

## Distributed LDA on Spark with PyMC

I’ve implemented the AD-LDA algorithm in the CollapsedDistributedLDA module. Because the code is involved, I will only outline the structure here.

The code begins by instantiating a `DistributedMCMC`

object. The supplied model_function will be run on each executor, returning an `MCMC`

object that locally runs the `MCMC`

using the instantiated stochastic variable, z (initialized randomly). The supplied `step_function`

defines the collapsed Gibbs sampling step method. Synchronization between the executors is achieved with the `global_update`

function, in which a reducer combines the word-topic counts in each of the executors and then distributes the new matrix.

##### NIPS Dataset

We will be extracting topics from the NIPS data set. NIPS is the Conference on Neural Information Processing Systems, focused on machine learning and held annually. Sam Roweis has applied OCR techniques to the scanned papers that were published in the proceedings and made the raw data available here.

I have prepared a Python scriptto preprocess the corpus which filters out the stopwords (words that can be found frequently and does not contain any semantic information, like “a”, “the”, “in”, “on”, etc.), applies lemmatization on words, removes the infrequent words (

##### Results

The experiments were conducted on a cluster with 6 machines, and 12 executors on each of the machines. The `total_partitions`

parameter was set to 72 (6 nodes x 12 executors) to maximize the parallelism of the `DistributedMCMC`

class. As suggested in [4], alpha and beta were set to 0.1 and 0.01 respectively.

A total of 100 topics were extracted using the AD-LDA algorithm. Below are five random topics, each with 10 words that have highest probabilities in the corresponding topic-word distributions, φ:

Topic #14 |
Topic #52 |
Topic #47 |
Topic #78 |
Topic #96 |

mixture |
network |
rule |
radio |
matching |

em |
model |
fuzzy |
objective |
graph |

density |
learning |
cell |
packet |
match |

p |
neural |
extraction |
optimization |
object |

data |
input |
symbolic |
channel |
point |

estimation |
function |
domain |
power |
objective |

likelihood |
set |
knowledge |
signal |
correspondence |

model |
figure |
group |
multiscale |
constraint |

x |
training |
expert |
application |
matrix |

parameter |
system |
shavlik |
scale |
distance |

As it is illustrated in the above table, topics that were formed by LDA algorithm make sense, where topic #14 is about maximum likelihood estimation and topic #52 contains terms related to neural nets.

Other than observing the multinomial distribution over words for topic, one can also compute the similarity between two documents, say document i and document j, by computing the Kullback-Leibler divergence of their corresponding topic distributions, KL(θ_{i}, θ_{j}) [4].

##### Replicating the Experiments

Here are some additional notes for those who want to replicate the experiments and run the AD-LDA model using PyMC on Spark.

- Install NLTK and the WordNet and Stopword corpora.

12import nltknltk.download()and select the relevant corpora from the popup.

- Clone the repository from https://github.com/mertterzihan/pymc(make sure you’re on the pyspark branch, which should be the default) and compile it by running the following commands:

123python setup.py config_fc --fcompiler gfortran buildpython setup.py installpython setupegg.py bdist_eggThe final step creates an egg file that will be distributed to the workers with Spark’s

`broadcast`

method. - Download the NIPS dataset from here and then run preprocess_nips.py to preprocess the raw data. Make sure that
`parent_folder`

and`destination_file`

variables point to the folder that contains raw data and desired output file location, respectively. This script requires NLTK, and WordNet and Stopword corpora to be downloaded. - Make sure you run Spark >= 1.0.2. Earlier versions throw an exception when trying to run PyMC. Check out these instructions for running a custom version of Spark on YARN.
- Inside the PySpark IPython shell, copy and paste CollapsedDistributedLDA. Make sure to change the path variable (location of the NIPS dataset), the egg file path (location of egg file formed after compiling PyMC), and path variable in the
`save_traces`

method. You can change the number of partitions, number of topics, number of local iterations (total iterations of Gibbs sampler between two consecutive global updates), and number of total iterations to obtain various results to find the best set of parameters.

## Future Work

Future work would include implementing other large scale graphical models and sampling algorithms, and experimenting on a wide range of datasets. Moreover, I’ve been in contact with the authors of PyMC regarding contributing the distributed MCMC code to the project.

##### Acknowledgments

I would like to thank my manager Josh Wills, my mentor Uri Laserson, Sean Owen, and Sandy Ryza for the invaluable guidance throughout my project. My internship at Cloudera has been a great experience and I can’t thank enough to them for this wonderful opportunity.

##### References

[1] R. G. Jarrett. A note on the intervals between coal mining disasters. *Biometrika*, 66:191-193, 1979.

[2] C. Andrieu, N. de Freitas, A. Doucet, and M. I. Jordan. An Introduction to MCMC for Machine Learning. *Machine Learning*, 50(1-2): 5-43, January 2003.

[3] D. M. Blei, A. Y. Ng, and M. I. Jordan. Latent Dirichlet Allocation. *Journal of Machine Learning Research*, 3:993-1022, March 2003.

[4] M. Steyvers, and T. Griffiths. *Probabilistic Topic Models. Handbook of Latent Semantic Analysis*, 427(7):424-440, 2007.

[5] D. Newman, A. Asuncion, P. Smyth, and M. Welling. Distributed Inference for Latent Dirichlet Allocation. *Advances in Neural Information Processing Systems*, 1081-1088, 2007.

[6] M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: Cluster Computing with Working Sets. *Proceedings of the 2nd USENIX conference on Hot topics in cloud computing*, 10:10, 2010.

[7] D. Barber. Bayesian Reasoning and Machine Learning. Cambridge University Press, 2012.

*Mert Terzihan is a summer intern on the Data Science team at Cloudera.*

## Filed under:

No Responses