Deep Learning with Intel’s BigDL and Apache Spark

Categories: CDH Data Science Hadoop Spark

Cloudera recently published a blog post on how to use Deeplearning4J (DL4J) along with Apache Hadoop and Apache Spark to get state-of-the-art results on an image recognition task. Continuing on a similar stream of work, in this post we discuss a viable alternative that is specifically designed to be used with Spark, and data available in Spark and Hadoop clusters via a Scala or Python API.

The Deep Learning landscape is still evolving. On one hand we have veteran frameworks like Theano and Caffe, or more popular ones like TensorFlow, while on the other we see emergence of JVM-based frameworks that can perform distributed deep learning using GPUs and CPUs. These JVM-based tools can leverage existing Spark clusters to parallelize the training of models. This post will discuss Intel’s BigDL, an open source distributed deep learning framework for Big Data platform using Apache Spark. It will highlight some high level differences between Deeplearning4j and BigDL, detail how to build a Convolutional Neural Network (CNN) while exploring a different approach to transfer learning – fine-tuning pre-trained models – and provide suggestions on troubleshooting and tuning Apache Spark jobs.

Comparing BigDL and Deeplearning4j

Intel’s BigDL is a relatively new open source framework that provides native support for deep learning on Spark using CPUs. On the other hand, Deeplearning4j was created in 2014, is built for the JVM and allows training deep neural networks on CPUs and GPUs. To compare these frameworks directly may not be fair, given their different life spans and both are still under development but comparing features can help us understand when one or both are applicable to a particular problem.



Framework & APIs

Comparatively a new deep learning framework that is modeled after Torch. Supports Scala and Python API

Has been around for three years and is a JVM based deep learning project. Supports Scala and Java API

Chip(s) supported

Supports distributed deep learning on commodity CPUs and not GPUs. BigDL works on any x86_64 hardware that uses MKL which is specially designed for Intel hardware

Supports distributed deep learning on CPUs and GPUs

Pre-trained model support

Supports models created in TensorFlow, Caffe or Torch framework

Supports models created in Caffe, Torch, Theano and TensorFlow framework. Also, has its new native model zoo library that would allow the user to automatically download and instantiate a pre-trained model

Linear algebra library

Uses Intel’s MKL (Math Kernel Library)

Uses ND4J native linear algebra library. By default OpenBLAS is used with ND4J but there is an option to use Intel’s MKL

User experience

Designed to be used along with Apache Spark which is a plus for the existing Spark users

Primarily built for the JVM, hence Java developers have been a primary focus. Good documentation and resources to get started, plus Gitter support channel for questions

Distributed deep neural network training on Spark

Supports training with CPUs

Employs synchronous parameter averaging to train a network where Spark’s Block Manager serves as a parameter server, thus avoiding the driver being a bottleneck in the communication, details here.

The parameter averaging occurs once a mini-batch is processed by all the workers.

The mini-batch size is expected to be a multiple of total cores used in the job

Supports training with CPUs and GPUs

Employs synchronous parameter averaging to train a network, where the driver serves as a parameter server, details here. Recently introduced a thresholding approach to training that reduces network communication requirements by orders of magnitude 1 

The user has a choice to set the parameter averaging frequency

No such restrictions on the batch size

ETL & Traditional Machine Learning

User can employ Apache Spark and its ML/ MLlib algorithms

Supports ETL through DataVec and some core NLP functionality using ClearTK.  One could implement his/her own machine learning algorithms using ND4J


Supports visualization through TensorBoard

Provides its own web UI tool for visualizing various statistics like training loss and iterations

Classifying Caltech-256 Images with BigDL

To further the comparison, below we present a worked example with BigDL that mirrors the previous DL4J example. That is, it walks through the steps to build a convolutional neural network that can classify images in the Caltech-256 dataset.

The Caltech-256 dataset was downloaded and separated into train and validation set using stratified sampling and stored in HDFS (full instructions). We begin with reading the JPEG files from HDFS as binary files and then decode the compressed format into the ByteRecord format as required by BigDL. Like any other machine learning workloads, we may want to pre-process the data before feeding it into the model. In the current case, we chain three transformations using the Transformer APIs.

For computing efficiency, we would like to train and infer data in batches, in this case 16. The mini batch size must be a multiple of the total number of cores. All this has a role to play in how the training process is conducted within BigDL.

Transfer learning

We are aware that Convolutional Neural Networks (CNNs) require large datasets and a lot of computational time to train. So instead of training from scratch, transfer learning utilizes a pre-trained model on a new dataset. Previously, we extracted features up until the second last layer and trained a new prediction layer. This time, we will not only train a new prediction layer but also allow the previous layers except the last to be tweaked, if needed.

To elaborate, let’s say a CNN has a set of shared parameters θs (e.g., five convolutional layers and two fully connected layers), task-specific parameters for previously learned tasks θo (e.g., the output layer for ImageNet classification and corresponding weights), and randomly initialized task specific parameters for new tasks θn (e.g., scene classifiers). It is useful to think of θo and θn as classifiers that operate on features parameterized by θs. Currently, there are three common approaches to learning θn while benefiting from previously learned θs2.

  • Feature Extraction: Uses a pre-trained CNN to compute the features of an image. θs and θo are unchanged, and the outputs of one or more layers are used as features for the new task in training θn. The extracted features are activations of one layer (usually the last hidden layer) or multiple layers given the image. This is what we accomplished in the previous blog post.
  • Fine-tuning: Modifies the parameters of an existing CNN to train a new task and the output layer is extended with randomly initialized weights for the new task. That is, θs and θn are optimized for the new task, while θo is fixed. We employ this strategy in the current post. It is also possible to use a variation of fine-tuning where part of θs – the convolutional layers – are frozen and only top fully connected layers are fine-tuned. This can be seen as a compromise between fine-tuning and feature extraction.  
  • Joint Training: All parameters θs, θo, θn are jointly optimized, for example by interleaving samples from each task.
Figure1: Transfer Learning Approaches

Figure 1: Transfer Learning Approaches2

Using pre-trained models with BigDL

BigDL supports loading pre-trained models, all it needs is a model checkpoint file (pre-trained model), a definition file and a BigDL Module (network) that represents the layers of the neural network. The Caffe library has a Model Zoo where people share their checkpoints, also known as network weights. For our learning task we use the VGG16-layer3 model checkpoint available here.

To reuse the existing weights we first need to make sure that the layer names in the VGG16NetCaltech module matches the original network’s layer name found in the model definition file as shown below.model definition file

However, we know that the VGG16-layer model was trained on the ImageNet dataset that has 1000 different classes and would want to change it for the current application. It can be achieved by simply renaming the last fully connected layer from “fc8” to “fc8_caltech256” and changing the output size to the number of classes in the Caltech-256 dataset, that is, 257. Thus, we have defined a new hybrid model that uses weights from a pre-trained model for all the layers except for the last one saving tremendous resources and training time.

Distributed training

The training process requires you to specify the model, training dataset and the loss criterion. In this case, it’s the negative log likelihood, which is used often for classification problems.

Setting the hyperparameters for training.

Along with the optimization method and associated hyperparameters we specify how frequently to test the validation set accuracy, how many passes to make over the training data, when and where to save the model checkpoint and so on.

Now if you run the Spark application on the cluster (details on submitting the Spark application and the entire code is available here), you would notice that it begins training the network in batches of 16 images and outputs the loss and other hyperparameters used.

Visualizing Learning

Training a neural network can be complex and confusing. One may want to keep a tab on how the learning progresses to help understand, debug or even optimize it further. For example, if in a run we choose to randomize the order in which the images appear within the mini-batches, and in another one we do not (while keeping the rest of the hyperparameters unchanged), we would find that the prior approach reduces the training loss. The orange line in Figure 2 below depicts the training loss after randomization.

Figure 2. Comparing Model Runs

Figure 2. Comparing Model Runs

The process of enabling visualization via TensorBoard is pretty straightforward. After installing TensorBoard, simply specify the location and a name (that describes the run) to store the training and validation summary results. Note that by default the TrainSummary will display the “loss” and “throughput” at each iteration, one can also enable “Parameters” or “LearningRate” but keep in mind that this would cause additional overhead and might slow down the training process.

Finally, in order to view the training use the command

The TensorFlow UI is then accessible at http://<ip>:6006, and it displays the loss, throughput, accuracy and other statistics.Finally, in order to view the training use the command

Figure 3. Model Training Summary

Figure 3. Model Training Summary

~5% Top1 Accuracy on validation set is pretty low to begin with, but as shown in Figure 3, at the end of the 15th Epoch the loss goes down and the training Top1 accuracy goes up to 30%.

Model checkpoints and performance

We can also independently test the model performance on a test set using any of the trained model snapshots saved at the checkpoint location.

If ever the model performance improves initially and then starts to flatten or decrease it might be a good idea to reduce the learning rate at that point while resuming training from where it left off. All one would need to do is use the model snapshot from the 15th epoch, which would be a minor change to the code above.

And then specify a new set of hyperparameters:

You can also choose to retrieve the existing optimization snapshot file:

Troubleshooting and Tuning Spark jobs

When trying to run BigDL Spark applications we have generally encountered out-of-memory (OOM) errors. Some of these errors tend to occur on the driver side while the others on the executor side. For example, at times the job fails when trying to load the VGG16 model, and the Spark web UI logs provide no other details in this case. Increasing the driver memory option should resolve this and is controlled by the --driver-memory option while submitting the Spark application.

Later, when the training process tries to cache the model on the executors, it might fail during initialization. The stack trace as shown below points to the issue. Increasing the executor memory does the trick here and is controlled by the –executor-memory option while submitting the Spark application. The error trace may look like the following:

Furthermore, at some point during the training iterations, like at the end of an epoch, one may have a trigger that would save the model at a specified checkpoint location. The job might fail, with messages about the YARN scheduler losing executors again. It might be caused when the application tries to fetch the current model from the executors to the driver or the driver trying to save the model (which is now >1 GB in size) at the specified checkpoint directory. Both these are also situations that result from running into out-of-memory can be fixed by either increasing the executor or the driver memory.


In this post we used Apache Spark and Intel’s BigDL to train a Convolutional Neural Network employing the fine-tuning transfer learning strategy. We found how well they work together and how this framework makes it easier for data scientists and analysts to continue to use their existing Apache Hadoop and Apache Spark platform as a unified data analytics platform. Intel’s BigDL is natively built on Spark, hence its vernacular Scala and Python API allows users to adopt it, making it well-suited for deep learning on Cloudera.




3Very Deep Convolutional Networks for Large-Scale Image Recognition
   K. Simonyan, A. Zisserman


One response on “Deep Learning with Intel’s BigDL and Apache Spark

  1. amar

    As Intel’s BigDL is natively built on Spark, hence its vernacular Scala and Python API allows users to adopt it, making it well-suited for deep learning on Cloudera.