How Cigna Tuned Its Spark Streaming App for Real-time Processing with Apache Kafka

Categories: Kafka Spark Use Case

Explore the configuration changes that Cigna’s Big Data Analytics team has made to optimize the performance of its real-time architecture.

Real-time stream processing with Apache Kafka as a backbone provides many benefits. For example, this architectural pattern can handle massive, organic data growth via the dynamic addition of streaming sources such as mobile devices, web servers, system logs, and wearable device data (aka, “Internet of Things”). Kafka can also help capture data in real-time and enable the proactive analysis of that data through Spark Streaming.

At Cigna Corporation, we have implemented such a real-time architecture based on Kafka and Apache Spark/Spark Streaming. As a result, we can capture many different types of events and react to them in real-time, and utilize machine-learning algorithms that constantly learn from new data as it arrives. We can also enrich data that arrives in batch with data that was just created in real-time.

This approach required significant tuning of our Spark Streaming application for optimal performance, however. In the remainder of this post, we’ll describe the details of that tuning. (Note: these results are specific to Cigna’s environment; your mileage may vary. But, they provide a good starting point for experimentation.)

Architecture Overview

As you may know, Spark Streaming enables the creation of real-time complex event processing architectures, and Kafka is a real-time, fault-tolerant, highly scalable data pipeline (or pub-sub system) for data streams. Spark Streaming can be configured to consume topics from Kafka, and create corresponding Kafka DStreams. Each DStream batches incoming messages into an abstraction called the RDD, which is an immutable collection of incoming messages. Each RDD is a micro-batch of the incoming messages, and the micro-batching window is configurable.

As illustrated below, most of the events we track originate from different types of portals (1). When users visit monitored pages or links, those events are captured and sent to Kafka through an Apache Flume agent (2). A Spark Streaming application then reads that data asynchronously and in parallel (3), with the Kafka events arriving in mini-batches (one-minute window).  The streaming application parses the semi-structured events, and then enriches them with other data from a large (125 million records) Apache Hive table (4). This table is read from Hive via Spark’s HiveContext and cached in memory. It also parses the rest of the event using the DataFrame API to build a structure around it. Once the record is built, it persists the DataFrame as a row in a Hive table. This same information can also be written back to a Kafka topic (5); while the physical layout of the files in the table is Apache Parquet, the rows are served up as a JSON response over HTTP to enterprise services. The enriched events can then become available to other applications or other systems in real time over Kafka.

This data is accessible through a RESTful API or JDBC/ODBC (6). For example, using the Impyla API for Apache Impala (incubating), we can make the data accessible as JSON over HTTP7mdash;a simple but effective low-level data service. Using dashboard creation tools like Looker and Tableau over Impala also helps users query and visualize live event data that has been enriched and processed in real time.


Summary of Optimizations

When we first deployed the entire solution, the Kafka and Flume components were performing well. But the Spark Streaming application was taking nearly 4-8 minutes, depending on resources allocated, to process a single batch. This latency was due to the use of DataFrames to enrich the data from the very large Hive table mentioned previously, and due to various undesirable configuration options.

To optimize our processing time, we started down two paths: First, to cache data and partition it where appropriate, and second to tune the Spark application via configuration changes. (We also packaged this application as a Cloudera Manager service by creating a Custom Service Descriptor and parcel, but that step is out of scope for this post.)

The spark-submit command we use to run the Spark app is shown below. It reflects all the options that, together with coding improvements, resulted in significantly less processing time: from 4-8 minutes to under 25 seconds.

Next, we’ll describe the configuration changes and caching approach in detail.

Driver Options

Note that the driver is being run on the cluster and that we are running Spark on YARN. Because Spark Streaming apps are long running, the log files generated can be very large. To solve for this issue, we limited the number of messages written to the logs and used the RollingFileAppender to limit their maximum size. We also disabled console log messages by turning off the spark.ui.showConsoleProgress option.

Also, during testing, our driver frequently ran out of memory due to the permanent generation space filling up. (The permanent space is where the classes, methods, internalized strings, and similar objects used by the VM are stored and never de-allocated.) Increasing the permanent space to 6GB solved the problem:

Garbage Collection

Because our streaming application is a long-running process, after a period of processing time, we noticed long GC pauses that we wanted to either minimize or keep in the background. Adjusting the UseConcMarkSweepGC
parameter seemed to do the trick:

(The G1 collector is being considered for a future release.)

Disabling Tungsten

Tungsten is a major revamp of the Spark execution engine, and, as such, could prove to be problematic in its first release. Disabling Tungsten simplified our Spark SQL DAGs somewhat. We will re-evaluate Tungsten in the future when it is more hardened for version 1.6, especially if it optimizes shuffles.

We disabled the following flags:

Enabling Backpressure

Spark Streaming has trouble with situations where the batch-processing time is larger than the batch interval. In other words, Spark will not be able to read data from the topic faster than it arrives—the Kafka receiver for the executor won’t be able to keep up. If this throughput is sustained for long enough, it leads to an unstable situation where the memory of the receiver’s executor overflows.

Setting the following alleviated this issue:

Adjusting Locality and Blocks

These two options are complementary: One determines how long to wait for the data to be local to a task/executor, and the other is used by the Spark Streaming receivers to chunk data into blocks. The larger the data blocks the better, but if the data is not local to the executors, it will have to move over the network to wherever the task will be executed. We had to find a good balance between these two options because we don’t want the data blocks to be large, nor do we want to wait too long for locality. We want all the tasks in our application to finish within seconds.

Thus, we changed the locality option to 1 second from the default 3 seconds, thereby enabling one of the 20 executors to start after 1 second has passed.  We also changed the block interval to 1.5 seconds.

Consolidating Intermediate Files

Enabling this flag is recommended for ext4 filesystems because it results in fewer intermediate files, thereby improving filesystem performance for shuffles.

Enabling Executor Performance

While configuring a Kafka DStream, you can specify the number of parallel consumer threads. However, the consumers of a DStream will run on the same Spark Driver node. Thus, to do parallel consumption of a Kafka topic from multiple machines, you have to instantiate multiple DStreams. Although one approach would be to union the corresponding RDDs before processing, we found it cleaner to run multiple instances of the application and make them part of the same Kafka consumer group.

To do that, we enabled 20 executors and 20 cores per executor.

We provided about 8GB memory to each executor to ensure that cached data remains in memory, and that there is enough room for the heap to shrink and grow. Caching reference datasets in memory helps tremendously when we run heavy DataFrame joins. This approach also speeds up processing of batches: 7,000 of them in 17-20 seconds, according to a recent benchmark.

Caching Approach

Cache the RDD before using it, but remember to remove it from cache to make room for the next batched iteration. Also, caching any data that is used multiple times, beyond the foreach loop, helps a lot. In our case, we cached the 125 million records in our Hive table as a DataFrame, partitioned that data, and used it in multiple joins. That change shaved nearly 4 minutes from total batch-processing time.

However, don’t make the number of partitions too large. Rather, keeping the number of partitions low will reduce the number of tasks and keep scheduling delays to a minimum. It will also ensure that larger chunks of data are processed with minimal delays. To confirm that the number of executors is proportional to the partitions we have, we simply kept the partitions at:

# of executors * # of cores = # of partitions

For instance, (20 * 20) = 400 partitions. Once the RDD is no longer needed in memory, rdd.unpersist() is called to swap it back out to disk.

(Note: The DataFrame API, although very effective, leaves a lot of processing to the underlying Spark system. In testing, we found that using the RDD API reduced processing time and excessive shuffling.)


Thanks to these changes, we now have a Spark Streaming app that is long running, uses resources responsibly, and can process real-time data within a few seconds. It provides:

  • Near real-time access to data + a view of history (batch) = all data
  • The ability to handle organic growth of data
  • The ability to proactively analyze data
  • Access to “continuous” data from many sources
  • The ability to detect an event when it actually occurs
  • The ability to combine events into patterns of behavior in real-time

As next steps, we are now looking at ways to further optimize our joins, simplify our DAGs, and reduce the number of shuffles that can occur.  We are also experimenting with Kafka Direct Streams, which may give us the ability to control data flow and implement redundancy and resilience through check-pointing.

While Cigna’s journey with Kafka and Spark Streaming is only beginning, we are excited about the doors it opens in the realm of real-time data analytics, and our quest to help people live healthier, happier lives.

Mohammad Quraishi (@AtifQ) is a Senior Principal Technologist at Cigna Corporation and has 20 years of experience in application architecture, design, and development. He has specific experience in mobile native applications, SOA platform implementation, web development, distributed applications, object-oriented analysis and design, requirements analysis, data modeling, and database design.

Jeff Shmain is a Senior Solutions Architect at Cloudera.


19 responses on “How Cigna Tuned Its Spark Streaming App for Real-time Processing with Apache Kafka

  1. JR

    I’m working on a project involves streaming data analytics using IBM Streams for data ingestion and redis for enrichment. Neither Streaming nor redis is my expertise, so bear with me the silly questions below:
    (1) My limited knowledge about Spark Streaming is that, it’s micro-batch based thus from performance’s standpoint it’s relatively slower than competing solutions like window-based or message-based, not to mention the potential risk when processing time is larger than batch interval. So I wonder the reasons why Spark Streaming is chosen over other streaming products.
    (2) As mentioned above, in our project for data enrichment purpose, we use redis which we believe performance-wise would be way better than Hive due to the in-memory vs. HDFS + MapReduce technologies underneath. I understand that there is 125+ million records in the Hive table and the table might have many columns. Still this can be easily addressed using say, redis.

  2. Gwen Shapira

    Cool use-case, guys and thanks for all the optimization details!
    You mention using backpressure to prevent Kafka Receivers from overflowing. Why not use SparkStreams direct kafka mode and avoid the receivers completely?

    1. Mohammad Quraishi


      Thank you for the comment. In fact, using direct streams is exactly where we are headed to avoid all that headache. We also want to use the native exactly-once semantics from kafka. We will enable spark streaming checkpointing to add better fault tolerance if the driver dies. Thanks for that great observation.

  3. Jeff Shmain

    Hi JR,

    Thanks for your questions. There is a large philosophical discussion when it makes sense to use Event Driven systems vs Micro-Batch type of systems. There are compelling reasons to use one over the other. I would encourage you to read a blog by a colleague of mine, Ted Malaska, that would be a great introduction: The last section “Complex Topology for Aggregations or ML” describes most of the reasons why we chose Spark Streaming in this case. We needed to use SQL, Spark Ease of use, state and all of those good things.

    To answer your second question, 125 Million record tables was not only implemented for this use case. There are many other consumers that access that data and some of them need SQL. We just piggy backed on that data already being there. In addition, there is a good amount of SQL we used in the Spark Streaming app itself, with joins, group by etc. Using Spark SQL within Spark Streaming application was just a very easy and straight forward way to implement this within the same platform. The alternative would be implementing another platform (such as Redis, loading all of the reference data in, figuring out how to do group bys, and joins without proper SQL)

  4. Ken Tam

    Thanks for sharing the tuning tips. Here are a few questions:
    On “parallel consumption of a Kafka topic from multiple machines”, the parallelism of Kafka message consumption is determined by the number of partitions from a given topic in that only one consumer can be consuming from each partition to guarantee processing order of messages. I am not sure how a Kafka topic being consumed from multiple machines would help in terms of parallel consumption.
    On disabling Tungsten, were there specific issues to drive this decision?

    1. Mohammad Quraishi

      Hey Ken,
      The number of partitions determines how many parallel threads can read from a Kafka topic depending on the number of consumers in the consumer group. Each of our instances used the same consumer group name effectively making them parallel reads from topic partitions. We had 8 partitions and 4 consumers. But we stopped doing that as we were fast enough to keep up with the messages arriving with no queueing at all in the streaming app. So we really only have one instance using kafka direct stream. With directStream, Spark Streaming will create as many RDD partitions (8 in our case) as there are Kafka partitions to consume, which will all read data from Kafka in parallel. So there is a one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune. So no more messy parallel logic that you’d need with receivers. Michael Noll has a great read on this. I assure you’ve seen it.
      For tungsten, We read many issues that people reported on the spark user group (please see the threads on that) with versions prior to 1.6 including issues with stability so we just stayed away. With Spark 1.6, Tungsten is enabled by default and we will be switching soon.

  5. Buntu

    Thanks for this write up!

    Can you provide more details about the HTTP7mdash thats mentioned as the data service, thanks!

    1. Mohammad Quraishi

      The REStful service is really a json over http app written in Django which uses Impyla to stream data from Impala and converts it JSON and then sends over HTTPS. It is really a low level service meant for efficient data return. There are obviously steps we;ve taken to ensure we don’t stream too many records. We also provide filters on the service that are translated into “where” clauses in SQL to tune the information returned.

  6. Raj

    Why do we need to use flume here along with Kafka … as you know that Kafka can collect the streaming data like flume too?
    Hence, here we can go with only Kafka. Is it not ?
    Please advise
    Thanks & Regards,

    1. Mohammad Quraishi

      Hi Raj,
      Great observation. We can for sure only use Kafka. The Flume portion is really there to fill in some technology related gaps in our click stream service. As you know, the publisher has to be capable of writing to kafka directly. In this case it was not.

    2. Jeff Shmain


      Kafka is a pub/sub application/message bus in this case. It needs producers to produce data and consumers to consume it. Flume in this case is a collector of clickstream data and a producer to Kafka cluster. Spark Streaming app is the consumer from Kafka cluster.

  7. Heman Kodappully

    Great article on Spark Streaming and Kafka, Mohammad & Jeff. I especially appreciate the Spark configuration details, the tuning tips, and the information on the key challenges that you faced . Thanks for sharing. I have a few questions/comments:
    1) How large is your cluster and is on your premises? Is the cluster dedicated to this Spark application, or is it shared with other YARN applications? In case of the later, how do you ensure this cluster gets the resources it needs?
    2) How many messages/events does the application process per second (or per day)? I assume 20 executors with 20 cores is able to process these messages adequately.
    3) Is 25 seconds the end-to-end latency from Flume/Kafka producer to seeing the event in the consuming BI/Analytics application? Which of the components accounts the major portions of this 25 second latency? I assume the use case does not require even faster processing?
    4) I understand there are some performance challenges with Spark Tungsten, but I was not aware of any major performance issues with Spark Data Frame API. How significant were these issues? Were you able to confirm the issue was with DataFrame API, and not something else (such as some configuration or some expensive joins with the 125 million row Hive table)? From some of the benchmarks I had seen, the Data Frame API has better performance, so I am really curious about this one.

    1. Jeff Shmain

      Hi Heman,

      Thanks for your questions. Unfortunately some of them are not things we can share due to confidentiality constraints, as its getting into too much details into Cigna’s implementation. Feel free to ping me offline to talk about your particular use case and what would be required there.

  8. Jun

    Thanks for publishing this great story! Can you elaborate more on your Caching Approach ? You cached the large Hive table as DataFrame, then partitioned the data. How did you partition the data ? Since you are using Spark 1.5.2, you can only set the number of partitions in DataFrame’s repartition() method like df.repartition(400). You cannot specify the particular columns that you want to partition by (this functionality is only available in Spark 1.6). can you elaborate what specific steps have you taken to keep the table join operation under 20 seconds ?

    You mentioned “keeping the number of partitions low” like 400, this number of partitions is applied to the DataFrame with 125 million records, or to the input streaming RDD, or both? You mentioned 7,000 of records in one test batch. 400 partitions for 7000 records is a lot of partitions. Can you clarify this ?


  9. Hema


    Great article, thanks for this.
    You mentioned driver memory to be 8G, but what is so much memory intensive operation on the driver side, that needs so much memory. As far as I understand, majority of your operations are happening on executor side. Any hints would be really helpful.

  10. sati

    Seems like it creates many small files, like one file for each record under Hive table since spark streaming persists one record once it was built. How do you overcome this small files under Hive table like running some background compaction job periodically or with any other approach?