How-to: Build a Complex Event Processing App on Apache Spark and Drools

Categories: HBase How-to Kafka Spark Use Case

Combining CDH with a business execution engine can serve as a solid foundation for complex event processing on big data.

Event processing involves tracking and analyzing streams of data from events to support better insight and decision making. With the recent explosion in data volume and diversity of data sources, this goal can be quite challenging for architects to achieve.

Complex event processing (CEP) is a type of event processing that combines data from multiple sources to identify patterns and complex relationships across various events. The value of CEP is that it helps identify opportunities and threats across many data sources and provides real-time alerts to act on them. Today, CEP is used across many industries for various use cases, including:

  • Finance: Trade analysis, fraud detection
  • Airlines: Operations monitoring
  • Healthcare: Claims processing, patient monitoring
  • Energy and Telecommunications: Outage detection

Like all problems in the analytic world, CEP is also complicated by the exponential growth of data. Fortunately, CDH (Cloudera’s open source platform containing Apache Hadoop, Apache Spark, and related projects) can serve as a foundation for CEP on big data via integration with an open source business-rules execution engine. In this post, you will learn how that integration could be done.

Architecture and Design

An example of a high-level architecture for building CEP on CDH is shown below. Depending on use cases, the components involved may vary.

bre-f1

The components used in the architecture above are:

  • Ingest: Apache Kafka or Apache Flume to ingest events
  • Storage: Apache HBase (or in the future, perhaps Kudu) to store and retrieve events
  • Alerting: Alerts via Kafka or other direct API integrations
  • Stream processing: Event processing is done with Spark Streaming. The processing occurs in micro-batches and involves parsing, lookup, persistence, building of current state from a series of historical events, and custom processing logic. For example, you could join various streams of Spark RDDs over a sliding-window interval and gain insights about trends in near real-time. The batching can be as frequent as every second, leading to an end-to-end latency of less than a few seconds.
  • Business process management: A rules framework gives technical and non-technical users the ability to design complex business logic. Business users can be directly involved to design a flexible decision automation process that is subject to complex, variable, and evolving business rules. For this post, you will integrate Spark with Drools, one of the most popular options, to evaluate business requirements.
  • Metrics: A dashboard based on a time-series database like OpenTSDB to provide metrics. You could also use Cloudera Search + HUE to deliver the same capabilities.

To keep the example simple, this post demonstrates the integration between Spark and Drools using random data. You can store incoming event data and evaluation results in HBase. (Reading/writing from Kafka, Event snapshot/state building using HBase, and so on are out of scope here, but you can get background about those issues here and here.)

For the purpose of this example, lets consider running all the sepsis-shock criteria using Drools. You will be essentially evaluating the conditions mentioned here.

bre-f2

As you can see from the above picture, a patient would meet SIRS criteria if any two vitals go out of range in a 24-hour window. These vitals arrive at different times, so typically you would use HBase to re-construct a patient state every time we get a new vital reading and then run though the rules. For the sake of this example, lets assume that you are getting all the patient’s vitals in every event, and skip the snapshot/state-building step.

Once a patient meets the SIRS criteria, you have to check for sepsis and then severe sepsis, septic shock, and so on, in that order. Here is flow chart of the evaluation process:

bre-f2a

Drools rules can be expressed in either a drl file or an Excel file. To capture all these conditions and keeping it more business-user friendly for the demo, you will use the Drools decision tables. This approach also provides visibility of the business logic to a wider audience (including business analysts) instead of embedding the business logic in Java/Scala code or custom syntax.

Here is a decision table that’s built to satisfy the sepsis calculator.

bre-f3

In the above picture:

  • All the light red-shaded cells are links back to code.
  • All the orange-shaded cells contain the values set after a rule is successfully evaluated.
  • All the green-shaded cells contain ranges/values on the incoming data to satisfy a given rule.
  • All the blue-shaded cells are names of rules and their conditions.

Here are few goals for the Spark and Drools integration:

  • Make rules execution seamless from Spark/Streaming.
  • Use the stateless part of the rules engine for simplicity. You can use Spark’s sliding windows to store states that extend over a period of time.
  • Run the rules either in a sequential order or at random based on the requirements.
  • Get the results back from rules execution into a Spark Dataframe to compute some metrics.

Coding

The goals above result in the following steps and snippets of code. You can get the complete code from https://github.com/mganta/sprue.

  1. The session factory initializes it for each partition only once and gets re-used for all subsequent dstream executions.
  2. Store the incoming data in HBase.
  3. For each event in the RDD, execute all the rules and return an RDD with evaluation results.
  4. Convert the RDD into a dataframe and compute some metrics.
  5. Store the updates in HBase.
  6. Call the time-series rest api and post the micro batch metrics. Your time-series dashboard can read this data. (You can learn how to install OpenTSDB here.)

All the steps above are linked in the Spark driver code here.

This example uses QueueStream to generate streams of random patient data. In a real-scenario, you would get hl7 messages as each event data. Running the example, you can see that the rules are executed for each incoming event, and grouped metrics for each state are printed out as shown below.

Or, if you can set up an OpenTSDB dashboard, you would see something like this.

bre-f4

Conclusion

When designing complex systems, using a rules engine can be a helpful choice; logic and data separation results in a flexible design while giving domain experts insight into decision logic. As you have seen here, combining CDH (for Spark, HBase, Kafka) with a rules engine can help you evaluate complex business logic and also act upon it in near-real time.

Madhu Ganta is a Solutions Architect at Cloudera.

Facebooktwittergoogle_pluslinkedinmailFacebooktwittergoogle_pluslinkedinmail

8 responses on “How-to: Build a Complex Event Processing App on Apache Spark and Drools

  1. Buntu

    Thanks for the post, really helpful!
    How would this design different in case one would like to go with utilizing the HBase coprocessor to evaluate the rules for each put?

    1. Michael Segel

      First, I would avoid co-processors like the plague. Especially if you’re dealing with PII.

      IMHO, I’d think you’d want to consider stream processing on the inbound data before it gets to HBase, but that’s just me. ;-)
      If you wanted to do this as a coprocessor, you could do this either in a pre-insert or post insert, where I’d opt for pre-insert check so that you can add or rather set a value if SIRS and Sepsis is present.

  2. Michael Segel

    I think that the flow chart should be re-written for clarification.

    The first set of checks for SIRS should be written as a count that gets incremented after each check if true.
    So you have count=0 then first check… if true, then count++ . Then both flows go to the next count.
    This is actually how you would code it anyway… ;-) Then you go in to the next conditional block.

    HTH

  3. Arvind

    Thank you for your post.
    I am trying to run your demonstration with some changes. I am using hive table as input data. but i am facing issue.
    Please check my changes below and help me to resolve this issue
    //function which return products from Hive table
    def getProductsList(a: Int, hiveContext: org.apache.spark.sql.hive.HiveContext): scala.collection.mutable.MutableList[Product] = {
    val products = scala.collection.mutable.MutableList[Product]()
    val results = hiveContext.sql(“select item_id,value from details where id=12”);
    val collection = results.collect();
    var i = 0;
    results.collect.foreach(t => {
    val product = new Product(collection(i)(0).asInstanceOf[Long], collection(i)(1).asInstanceOf[String]);
    i = i+ 1;
    products += product
    })
    products
    }
    Calling getProductsList function and applying drools rools on products.

    val randomProducts = this.getProductsList(1,hiveContext)
    val rdd = ssc.sparkContext.parallelize(randomProducts)
    val evaluatedProducts = rdd.mapPartitions(incomingProducts => {
    print(“Hello”);
    rulesExecutor.evalRules(incomingProducts) })
    val productdf = hiveContext.applySchema(evaluatedProducts, classOf[Product])
    })

    Above code is not entering into rdd mapPartitions and it is throwing following error. But I am sure rdd is not empty.

    Exception in thread “main” java.lang.NullPointerException
    at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:103)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:102)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:102)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:47)
    at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:995)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:488)
    at org.apache.spark.sql.SQLContext.applySchema(SQLContext.scala:1028)
    at com.cloudera.sprue.ValidateEan$.main(ValidateEan.scala:70)
    at com.cloudera.sprue.ValidateEan.main(ValidateEan.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    16/05/05 07:44:48 INFO SparkContext: Invoking stop() from shutdown hook

  4. Tony D.

    Thanks for the post.
    Few questions:
    How would your solution scale if the rules are unique per patient?
    Would you still do it this way if the flow of events was not predictable and you get big bursts occasionally (with finite cores), potentially causing a long queue.
    Complexity aside, an Akka based platform with better controls on scale, routing and caching would scale much better, I feel.

  5. Ank

    Thanks for the post.
    Very helpful. :)
    Can anyone please tell me how to reload the drools rule in spark cluster mode.
    I have the same scenario in my project as in the above example, only I need to reload my rules after some time.

    Thanks in advance.

  6. Rashmi

    Hello There – Is DROOLS certified by Cloudera? If No, then would you know how DROOLS can be supported?
    Thanks,
    Rashmi