Production Recommendation Systems with Cloudera

Categories: CDH Data Science

Many types of business problems boil down to making recommendations, and machine learning is the special sauce that makes these problems solvable. Machine learning for recommendations is a challenging endeavor in its own right, but it is just one part of the recommendation system, which must move, store, process, and update data, in production, across several different components. In this post we show how to use Cloudera’s distribution of open source software to build a production scale recommendation system, and how research from Cloudera Fast Forward Labs can be used to move the machine learning to the cutting edge.

What is a recommendation system?

I love watching movies, but I dread deciding what to watch. Can you remember the days when watching a movie meant spending 30-60 minutes combing through a brick and mortar video rental store, relying on pictures and serendipity to guide you? Nowadays, we can just logon to our favorite streaming service and it will actually tell us what we want to watch.

Underlying these movie suggestions are recommendation systems, usually powered by machine learning, that suggest a subset of movies that we are most likely to enjoy. Recommendation systems aren’t just for choosing movies, though. We interact with them constantly – reading the news, shopping online, searching the internet – basically, any time someone or something needs to find a few relevant items amongst many. If that sounds generic, it’s because it is: a surprising amount of data problems fit the mold of a recommendation problem.

Great recommendations have been enormous differentiators for companies like Netflix, Amazon, LinkedIn, and others. Accordingly, recommendation systems are a very active area of research and continue to receive high interest from businesses looking to use machine learning for competitive advantage.

The latest research from Cloudera Fast Forward Labs

Cloudera Fast Forward Labs, an applied machine learning research division within Cloudera, recently released a report covering the latest advancements in recommendation systems, specifically highlighting the use of deep learning for incorporating unstructured data into recommendations. The report showed that the use of unstructured, semantic content can be used to beat more traditional methods that only look at higher level, structured data.

The prospect of deep learning impacting recommendation systems in the same way it has impacted computer vision and natural language processing is certainly exciting. But, before you can make use of these new techniques, you need to be able build and deploy a scalable recommendation system to production. That’s where Cloudera comes in.

Cloudera’s open source distribution of Apache Hadoop and its ecosystem is meant to provide organizations with a comprehensive set of tools needed to build these systems. In this post we’ll first see how Cloudera can help you build a traditional recommendation system using collaborative filtering for recommendations, and then discuss how to integrate cutting edge deep learning techniques for more personalized recommendations.

System Overview

Recommendations systems, like any production machine learning system, aren’t just fancy algorithms, but rather a collection of integrated components that move, store, and process data to meet specific latency, security, and scale requirements. While the exact architecture of a recommendation will vary greatly depending on the nature of the application, most recommendation systems adhere to the following general structure.

Generic flow diagram for recommendation systems

Generic flow diagram for recommendation systems

We’ll discuss the purpose and requirements for each of these components in a general sense, but we’ll also provide a specific reference architecture that makes the discussion more concrete. The Oryx project from Cloudera is a reference implementation of the Lambda architecture on Hadoop, which serves as a useful example of a production ready recommendation system that contains each of these components.

Oryx project architecture

Oryx project architecture

Machine Learning

The machine learning component needs to provide the means to train an algorithm that can learn from past data and produce a model that can be deployed to make intelligent recommendations for users. Crucially, the type of output of this algorithm will impact the design of other components like deployment and storage. We’ll also need to continually update the model as users generate new data by interacting with items.

Ideally, the model would refine itself immediately and optimally every time new data arrives, but in practice this is difficult to achieve. Instead, a common solution that works well in practice is to take advantage of the Lambda architecture.

Instead of using just online learning or just offline learning, the Lambda architecture manages a process for each. The online learning is approximate, producing updates from individual data samples, but provides the benefit of near-real-time feedback, while the offline update can learn from all historical data, since it runs less frequently.

Offline Training

Offline TrainingOffline training is the primary learning mechanism for the machine learning recommendation algorithm. This process usually involves the traditional data science practice of training a model on a training set, doing cross validation to select the best hyperparameters, and exporting that model to a format that can be used by the serving layer.

Training and selecting the best model should be automated and run at some frequency that meets the needs of your application, but is often on the order of hours. It may be run using a job scheduling service, or even as a streaming job with a long update interval. In the case of Oryx, the offline training is implemented as an operation on a Spark DStream of new user/item interaction data. The new data that has arrived within the latest batch interval is combined with historical data from the Hadoop filesystem (HDFS) and used as the training data for the Spark ML alternating least squares (ALS) algorithm.Spark ML ALS

Alternating least squares is a type of matrix factorization algorithm that finds a numerical representation of each user and each item. Users and items are effectively embedded into a k-dimensional space, and their distance from each other in this space is a proxy to their “preference” for one another. Hence, item embeddings that are near a particular user embedding are likely to be good matches for that user. The output of the ALS model is a k-dimensional vector for every user and every item, which are often stored as two matrices – one for user vectors and one for item vectors.

The Spark ALS model is just one example and could be replaced with other algorithms from scalable machine learning packages, or with custom algorithm implementations, like the one explored in our recommendations report. Matrix factorization algorithms like ALS are great starting points because they are simple, easy to implement, and can be generalized to almost any recommendation problem.

Online trainingOnline Training 2

The online training process involves updating a copy of the latest batch model as new data streams into the system. We need a place to store the model such that it can be updated quickly, and a streaming job that can learn the new model parameters from the data as it arrives. We also need an algorithm that can support incremental updates.

The ALS algorithm used in the Oryx project takes advantage of recursive least squares to make an approximate, but online, update for both user and item parameters. This update is approximate because it does not alternate until convergence like the full algorithm requires, but is still useful for incorporating new data in real-time. The current working user and item matrices are held in memory on an edge node and updated via a Spark streaming transformation, which maps a stream of new data to a stream of model updates.Spark Streaming

Online updates to the model can also help the system cope with the cold-start problem.  When a user first visits the service, we won’t know anything about that user’s preferences and may be forced to make coarse or non-personalized recommendations. Having an online update, however, means that when a user interacts with their first item(s), we can immediately refine their predictions to include this new information. This is especially important since the first interactions users have with a service are critical for user retention.

Front EndFront End

The front end is typically some sort of web service where users will interact with the application and receive recommendations. With regard to the recommendation component of the system, the front end needs to be able to scale to serve recommendations to many users concurrently, as well as forward new user/item interaction data to the transport layer. The front end for the Oryx project is a simple web server implemented using Apache Tomcat. The server also houses the deployment or serving layer, and writes newly generated interaction data to an Apache Kafka topic.Apache Tomcat

Data TransportData Transport

Data will be constantly flowing between various parts of the system that produce and consume it, which requires a dedicated transport layer for reliable delivery. New data will be continuously generated as users interact with the application and will need to be processed and moved to long term storage, as well as incorporated in training new models. Updates to the recommendation model will be communicated to the serving layer and also written to long term storage.

The transport layer should:

  • Accept multiple input streams from a variety of sources
  • Deliver data streams to multiple downstream consumers
  • Be fault-tolerant
  • Operate at extreme scales
  • Add and subtract data producers and data consumers with no downtime
  • Perform stream processing on in-flight records

Apache Kafka is a good choice here, especially since it allows for an arbitrary number of data producers and consumers. Moving all data in the system through Kafka can simplify things because each component only needs to worry about reading from or writing to Kafka, instead of integrating with several different sources or consumers. Kafka wasn’t designed for stream processing, however, and so other streaming components like Spark Streaming or Apache Flume may be used in conjunction with Kafka.

The Oryx project uses Apache Kafka and Spark Streaming in its data transport layer. New user and item interaction data enter the system via the Apache Tomcat front end, which are then parsed and written to the Kafka input topic. This data is consumed by both the real-time update component, as well as a Spark DStream which then writes the data in mini-batches to HDFS. There is also a Kafka topic which receives model updates from both the online and offline training components, which is consumed by the front end and online training components.Kafka and Spark Streaming

Users may wish to extend this architecture by sending processed streams of data to a real-time dashboard or BI tool, which is easy to do with Kafka since the tool can just be added as a separate consumer of the stream. Additionally, this architecture makes it easy to take data from the front end and route it to some sort of A/B testing tool, which is critical for model evaluation.


How a model is deployed defines the process by which a user is served a list of recommendations when they visit the service. This will usually depend greatly on what the output of the trained model is. For example it could be a relatively small list of parameters for a classification model, or it could be two large matrices in the case of a matrix factorization algorithm. Another concern is how features for the model will be generated or retrieved at prediction time. Some models may not require additional features, while others may retrieve stored features from a low-latency data store or compute them at runtime.

Unfortunately, there is not necessarily a common or obvious solution to model deployment. What you should choose depends greatly on the latency and scale requirements of the application. In the Oryx architecture, a custom implementation for model deployment was used, which stores the ALS matrices in memory and uses a specialized variant of locality sensitive hashing (LSH) and an approximate nearest neighbor search for fast predictions. Model updates from the online training process are incorporated by reading from the model update Kafka topic, and updating the model stored in the cache. Serving LSH+KNN

This custom solution required significant engineering overhead, but is important if low-latency predictions are a requirement. There are other options if latency associated with network requests is acceptable, for instance using a custom scoring service that serves recommendations on request.

Deploying deep models

For pretty much any recommendation application, it is expected that you’ll need to explore different learning algorithms for making the best possible recommendations. Naturally, there will be added effort each time a new modeling approach is added or swapped into the system. However, if two different learning approaches produce the same or similar types of models, you will already have a significant portion of the required infrastructure in place, and there will be less overhead involved in making the switch.

Fortunately, many recommendation algorithms fit a general pattern, but differ on the implementation details: use past data to learn vector space representations (embeddings) for users and items, and use the distance between these users and items in the embedded space as a measure of similarity and therefore preference. The output of the alternating least squares algorithm used in the Oryx project is, in fact, a vector representation of each user (user matrix) and a vector representation of each item (item matrix). Conveniently, the output of the deep learning model used in the Cloudera Fast Forward Labs report prototype are also user and item embedding vectors. A big advantage of designing a first iteration of your system with an ALS algorithm is that it is much simpler to add more advanced deep learning techniques in the future.

To make this change in the Oryx architecture, we simply need to worry about swapping out the offline and, optionally, the online training component. The deployment layer can stay exactly the same, since we’ll just swap out the ALS embeddings with the embeddings from the new deep learning model!

For the offline component, we can schedule a job that will stream training data in mini-batches from HDFS to an edge node, preferably equipped with one or more GPUs, and use Keras with Tensorflow for batch training. Alternatively, we could use a Spark-based deep learning framework like deeplearning4j or Intel’s BigDL and leave the offline training component as a Spark job launched on the cluster. Because neural networks are by default trained incrementally using gradient descent, the online component is similarly easy to replace. The new data that arrives in each time interval can be treated as a mini-batch of training data, and the model can be updated by making a forward and backward pass over the model. We won’t need GPUs for this part, since we just make one pass through the network for each new batch.Spark Streaming Keras+Tensorflow

In building a production recommendation engine, it is usually better to start with a simpler model like matrix factorization, and consider moving to more complex techniques if the simpler model isn’t performing adequately. In most cases we wouldn’t start by replacing the matrix factorization, but by adding the deep learning model as an additional model that can be used in combination with the collaborative filtering approach.


An effective recommendation system requires not only a mechanism for making good recommendations, but an entire infrastructure that can get those recommendations to users when needed and can adapt over time. Even simple implementations like the Oryx architecture can be tricky to get right, and various tradeoffs must be made. A sophisticated or cutting edge learning algorithm is useless if you can’t effectively productionize it.

Instead of fixating on complex modeling techniques, first focus on getting a simpler recommendation algorithm to production and building an extensible infrastructure around it. The needs of the system will inevitably change over time, so the infrastructure should support extending and modifying the existing components and modeling techniques. When and if more personalized recommendations are required, we can swap in new deep learning models if the infrastructure is well-designed.


One response on “Production Recommendation Systems with Cloudera

  1. Tom W

    Good explanation. The next thing to evaluate could be instances of building from Tomcat back, or HDFS/RDB forward, or from places in between. The conveyance of what you are doing, as sold to the layperson, is also very important going forward.

Leave a Reply

Your email address will not be published. Required fields are marked *

Prove you're human! *