Recently we worked with a customer that needed to run a very significant amount of models in a given day to satisfy internal and government regulated risk requirements. Several thousand model executions would need to be supported per hour. Total execution time was very important to this client. In the past the customer used thousands of servers to meet the demand. They need to run many derivations of this model with different economic factors to satisfy their requirements. For example, a financial model may calculate risk to the bank based on many different runs with varying economic factors. This particular model was planned to consume up to 40K CPU cores once in production. The reason for so many cores is simple, they need these jobs to be completed as quickly as possible to make adjustments for the business and sometimes the government to test varying economic factors that affect a financial institution. The cycle that they run these jobs in are very compressed and allows very little room for error.
This customer used a traditional HPC (High Performance Compute) Grid, to accomplish this type of use case in the past. A Grid is a collection of servers that run many parallel compute tasks at once. Grid applications are historically written in C++ and orchestrated with scripting languages like Python.
We helped the customer move an existing model written in C++ to Apache Spark. Utilizing Apache Spark and Cloudera allows them to use their existing C++ code with little modification. Additionally, they can run these workloads seamlessly in the cloud. Recently Cloudera released the Cloudera Data Science Workbench (CDSW). CDSW accelerates data science from exploration to production using R, Python and Apache Spark. All of this with the same security and data governance they are accustom to with Cloudera.
What is Apache Spark?
Apache Spark provides flexible, in-memory data processing for Hadoop. It is primed to replace MapReduce as the standard data processing engine in Hadoop, and will achieve that with the work done with the One Platform Initiative. Key benefits are:
Familiar language support and interactive development
Flexible, Extensible API
Supports multiple workloads (batch, stream, machine learning)
Faster Batch and Stream Processing
In-memory processing and caching
Cast Of Characters
Quants or Data Scientists
In financial institutions, Quantitative Analyst “Quants” design and develop complex models to satisfy business or regulatory requirements. In this example the Quant is tasked with developing risk models. Quants have been doing “Data Science” since before the term Data Science was coined.
The Quants stand to gain the most from this project.
Grid Engineering Team
The Grid Engineering Team is responsible for a very large on-premise grid at this company. This grid is many thousands of nodes. They have written their own schedulers and have years of experience of operating large compute grids. With such formalized runtimes, we expected to be met with resistance from this team. However, my experience was quite the opposite. They were very open to having Hadoop coexist on the grid. Additionally, they were excited at the prospect of using HDFS (Hadoop Distributed Filesystem) for more than just Spark on their grid. Additionally, The Grid team benefitted from Cloudera Manager to manage and report on the grid infrastructure.
Hadoop Administration Team
The Hadoop Team is responsible for several large Hadoop Clusters. They are accustomed to using 2U nodes with 24TB of Storage. In this case their modeling growth needs were outpacing their available rack space. Additionally, there was an initiative to move larger modeling workloads to the cloud over time. Both the Hadoop team and the grid engineering team mentioned above can benefit from utilizing the on premise grid during and after the cloud transition. Utilizing the Grid for denser compute and the existing Hadoop cluster for more dense storage allowed the customer to grow compute and storage independently.
The quant team at the bank manages an extensive library of models. This model library has been built over several years and is very valuable to the organization. The team needed a way to leverage these models with very little customization. We recommended PySpark to meet their needs. The team is very comfortable with Python and C++ based on their past development experience.
The Test Model
We developed a simulation model to prove Spark scale to customer that did not contain proprietary information. In order to do this we needed to distribute the compiled C++ .so via Spark addFiles and utilized the model as a Python extension. See: https://docs.python.org/2/extending/extending.html for more details on Python module development. The C++ required very little modification and was easily packaged as a Python Module to run in PySpark.
The process was relatively smooth minus an issue with a memory leak in the customer provided C++. Historically the grid would create new Python interpreters for each model execution. In PySpark, the number of cores per executor equates to the number of Python interpreters used. With the memory leak and total number of cores per executor consuming the executor heap the job was running out of memory during longer/bigger executions. Pyspark uses Py4j under the covers. These PySpark interpreters are used for the life of the entire job. Thus, garbage collection (GC) is more important. Without proper GC the YARN container will run out of memory on a longer running job.
See section 1.10 in https://docs.python.org/2/extending/extending.html. This section details proper garbage collection utilizing PY_DECREF.
After the memory leak was fixed, the Python interpreter ran with a constant 600mb of memory usage. Prior to this change we would see the common YARN container memory limit reached error (Container memory limit reached, executor memory was set to 80g).
Proof Of Concept(s)
We did a few smaller proof of concepts (based on the same simulation code mentioned above) to finally ramp up to 2500 nodes in the cloud. We were able to show linear scale of Spark at 625, 1250 and 2500 Nodes respectively. Interesting enough we were able to provision 2500 nodes in the cloud in 90 minutes. Each Node in the cluster has 16 Cores and 120 Gigabytes of RAM.
The below configurations were necessary to run the model simulation on 2500 nodes in the cloud. Additionally, we raise the YARN Resource Manager and Node Manager Java Heaps to 16 Gigabytes.
Spark Configuration Properties
//this is max we can get to for serialization memory
//queue size is what pushed us to spark 2.0
//to avoid a lot of garbage being spit out
//spark yarn settings
dfs.datanode.handler.count = 64
dfs.namenode.handler.count = 500
dfs.namenode.service.handler.count = 500
What are some considerations and benefits of offloading your Compute Grid?
- Do You Have a Legacy Compute Grid?
- Do you want to have more flexibility in how you use your grid? Portions of the Hadoop ecosystem can open your grid to more users.
- Do you want to use a platform neutral compute language? Spark workloads can easily be ran on a grid, in a more traditional Hadoop Cluster or as a transient workload in the Cloud.
- Are you looking to decrease Model Dev Cycle Time?
- Spark is higher level API/framework that supports Python, Scala and Java. It can be used by developers of different experience levels.
- Spark allows for very simple data manipulation, allowing the Data Scientist or Quant to spend more time on the business problem.
- PySpark allows a company that is using a legacy C++ grid to re-use their C++ library assets with very little to no changes. Python to C++ bindings result in minimal performance penalties.
- Cloudera Data Science Workbench (CDSW) allow Quants/Data Scientists to rapidly develop and visualize models with more involvement from the business.
- Is Your Model Input Data or Output Data Growing Rapidly?
- Legacy Grids have very little Storage, Hadoop Clusters organize compute and storage.
- Using Hadoop allows you to utilize your grid investment for compute and allows you to run the same workloads with no modification in the Cloud or on Internal Hadoop Cluster.
- In infrastructures with direct attached storage, Hadoop’s locality based processing allows for fast efficient movement of data between storage and compute.
- Are you interested in Moving Legacy Compute Grid Workloads to Cloud?
- Deploying Hadoop on a portion or on all of your grid allows you to use the same tools on the grid that you would use on a Cloud Based Hadoop Cluster.
For this Cloudera customer we were able to show an optimization of the entire model development cycle. The end to end cycle time for a new model or change is drastically reduced by allowing the teams to speak the same language. Reducing the development cycle time is paramount, it allows the teams to more quickly and respond to regulation concerns. The modeling teams can continue to develop in C++ and at the same time more fully embrace Spark. The Quants can access important data sets in a managed and governed way. Additionally, with Cloudera Data Science Workbench they are able to do collaborative development with other Quants and visualize quickly to business partners. The infrastructure teams benefit from a single data operating system that can be deployed on premise on normal Hadoop Nodes, on existing grid infrastructure and eventually in the cloud.
To learn more about data engineering and data science at Cloudera visit https://www.cloudera.com/products/data-science-and-engineering.html