In this post, engineers from Wargaming.net, the online game developer and publisher, describe the design of their real-time recommendation engine built on CDH.
The scope of activities at Wargaming.net extends far beyond the development of games. We work on dozens of internal projects simultaneously, and our Data-driven Real-time Rules Engine (DDRRE) is among the most ambitious.
DDRRE is a system that analyzes large amounts of data in real time, allowing us to personalize game interaction with players through recommendations, without unwanted advertising or promotional messages. Those recommendations are based on the user’s most recent game-play session. The result is a much better customer experience overall.
In this post, we’ll describe the architecture of DDRRE (which is based on CDH) and explain some of the lessons we learned during its design and implementation.
Data is collected using a common bus, Apache Kafka. All game subsystems write logs to the bus in real time. Because some subsystems cannot do that due to technical limitations, we wrote adapters that collect and forward the logs to Kafka. Our stack contains adapters for MySQL, PostgreSQL, and RabbitMQ, as well as one for loading historical data from our enterprise data hub through a Hive JDBC interface. All adapters were developed in Java 8 and Scala. (We intend to wrap them as Cloudera Manager parcels.) Here are some additional details:
- We based the MySQL and PostgreSQL adapters on the Tungsten Replicator, an open source replication engine that was invented at VMware. It includes a MySQL replication adapter for reading data from binary logs. (Replication is a reliable way to obtain data without burdening the database server.) We also developed a producer, which sends data to Kafka and stores the last recorded offset in Apache ZooKeeper.
- The adapter for RabbitMQ simply transfers data across queues. Entries are transferred to Kafka one by one, and then are ‘acknowledged’ in RabbitMQ. The service delivers the message at least once; deduplication occurs on the data side.
- When there is a need to process historical data, we bring it into the enterprise data hub via JDBC for processing by Apache Hive or Apache Impala (incubating).
Data Analysis in DDRRE
The WG Hub subsystem gets a lot of information from different data sources, but not all of it is ready for direct use without some transformation. To transform the raw data, we use a module called Raw Data Transformer (RDT) to store the business logic for data integration. Eventually, a standardized message is produced that represents a certain event with a set of attributes. The message goes to Kafka as a serialized Java object. RDT processes the number of topics equal to the number of data sources, resulting in one topic with a flow of different events, partitioned by the player’s identifier. This approach ensures that at the next step, data about a specific player is processed by one executor (using Spark Streaming’s
directStream) and reserved for the partition.
The main drawback of this module is the need to edit the code and redeploy in case the structure of input data changes. We are evaluating options such as using some kind of a meta-language in the transformation to make changes in the logic without re-writing the code.
The main task of this module is to give the end user the ability to create rules in the system. Rules will react to the events in the data bus, and, bearing historical data on the player, will send notifications to the target system on the basis of user-defined logic.
We use Drools for this purpose for several reasons:
- It’s written in Java, so integration is easier.
- It’s bundled with a GUI for creating rules (not the most robust UI, but still useful).
- Its KieScanner component allows updating of rules without a restart.
- Drools can be used as a library without having to install additional services.
- It has a large community should we need help with something.
Our repository of historical information form the player runs on Apache HBase. Processing is linked to the player’s ID, and HBase is solid for balancing the load and sharing data across regions. Of course, we get the best response time when all data fits in blockCache.
BRE works like this:
Drools distributes the rules as a compiled JAR file, so at the first stage, we’ve established a local Maven repo and set up a project in the Workbench to deploy to the repository through the distributionManagement section in the
When the Spark application starts, a separate Drools KieScanner process starts in every executor. Those processes periodically check the artifact with the rules in Maven. The version for testing the artifact is installed in LATEST, allowing to load new rules into the current running code as they appear.
When a batch of new events gets into Kafka, BRE processes them and reads blocks of historical data about each player from HBase. Then events with the player’s data transfer to the Drools StatelessKieSession, where they are checked for compliance with the currently-loaded rules. As a result, the list of triggered rules is recorded in Kafka. In-game tips and suggestions for the user are based on this list.
Here are some improvements and optimizations we made along the way:
- Serialization of historical data for storing in HBase. Initially we used the Jackson JSON processor to persist data, with the same object definition used in two places: rules and store. However, that approach prevented proper optimization of the storage format and forced unnecessarily complex annotations in Jackson. So, we made a decision to split the object into POJO for rules, and class-generated on Protobuf for serialization. As a result, POJO, used in the workbench, gained human-readable structure and can act as a “proxy” for protobuf-object.
- Optimization of HBase lookups. During testing we noticed that several events from the same account got into the processing batch. The case was game specific. Since a query to HBase is a very resource-intensive operation, we decided to group accounts in batches by ID and read the historical data just once for the entire group. This optimization has reduced the number of requests to HBase by 3x-5x.
- Optimization of Data Locality. In our cluster, each machine combines Kafka, HBase, and Spark instances. The processing starts with reading from Kafka. Locality allocates the processing to the host with the leader of the partition. However, if one considers this entire process, it becomes clear that the volume of data read from HBase is significantly higher than the amount of data from incoming events. Thus, the sending of data over the network takes more resources. To optimize the processing step after data is read from Kafka, we added an additional shuffle that rearranges data into separate HBase regions and sets the locality. This change significantly reduced network traffic and increased productivity as each Spark task now refers to only one particular HBase region–not to all of them.
- Optimization of resources used by Spark. With more processed partitions and fewer executors, locality wait time was much higher than processing time. So, to free more processing time, we reduced spark.locality.wait.
The current version of the module is coping with tasks well. However, there’s still room for improvement.
Future plans include a “Rules as a Service” system, or the ability to trigger rules upon request from the external service through the API, not due to in-game events. This will respond to such queries: “What is the rating at this player?”, “Which segment it belongs to?”, “What item is best suited for him?” and so on.
We hope that you find DDREE to be inspirational for your own projects!