Offset Management For Apache Kafka With Apache Spark Streaming

Categories: CDH Kafka Spark

An ingest pattern that we commonly see being adopted at Cloudera customers is Apache Spark Streaming applications which read data from Kafka. Streaming data continuously from Kafka has many benefits such as having the capability to gather insights faster. However, users must take into consideration management of Kafka offsets in order to recover their streaming application from failures. In this post, we will provide an overview of Offset Management and following topics.

  • Storing offsets in external data stores
    • Checkpoints
    • HBase
    • ZooKeeper
    • Kafka
  • Not managing offsets

Overview of Offset Management

Spark Streaming integration with Kafka allows users to read messages from a single Kafka topic or multiple Kafka topics. A Kafka topic receives messages across a distributed set of partitions where they are stored. Each partition maintains the messages it has received in a sequential order where they are identified by an offset, also known as a position. Developers can take advantage of using offsets in their application to control the position of where their Spark Streaming job reads from, but it does require offset management.

Managing offsets is most beneficial to achieve data continuity over the lifecycle of the stream process. For example, upon shutting down the stream application or an unexpected failure, offset ranges will be lost unless persisted in a non-volatile data store. Further, without offsets of the partitions being read, the Spark Streaming job will not be able to continue processing data from where it had last left off.

[figure 1 - high-level flow for managing offsets]

[figure 1 – high-level flow for managing offsets]

The above diagram depicts the general flow for managing offsets in your Spark Streaming application. Offsets can be managed in several ways, but generally follow this common sequence of steps.

  1. Upon initialization of the Direct DStream, a map of offsets for each topic’s partition can be specified of where the Direct DStream should start reading from for each partition.
    1. The offsets specified are in the same location that step 4 below writes to.
  2. The batch of messages can then be read and processed.
  3. After processing, the results can be stored as well as offsets.
    1. The dotted line around store results and commit offsets actions simply highlights a sequence of steps where users may want to further review if a special scenario of stricter delivery semantics are required. This may include review of idempotent operations or storing the results with their offsets in an atomic operation.
  4. Lastly, any external durable data store such as HBase, Kafka, HDFS, and ZooKeeper are used to keep track of which messages have already been processed.

Different scenarios can be incorporated into the above steps depending upon business requirements. Spark’s programmatic flexibility allows users fine-grained control to store offsets before or after periodic phases of processing. Consider an application where the following is occurring: a Spark Streaming application is reading messages from Kafka, performing a lookup against HBase data to enrich or transform the messages and then posting the enriched messages to another topic or separate system (e.g. other messaging system, back to HBase, Solr, DBMS, etc.). In this case, we only consider the messages as processed when they are successfully posted to the secondary system.

Storing Offsets Externally

In this section, we explore different options for persisting offsets externally in a durable data store.

For the approaches mentioned in this section, if using the spark-streaming-kafka-0-10 library, we recommend users to set to false. This configuration is only applicable to this version, and by setting to true means that offsets are committed automatically with a frequency controlled by the config In Spark Streaming, setting this to true commits the offsets to Kafka automatically when messages are read from Kafka which doesn’t necessarily mean that Spark has finished processing those messages. To enable precise control for committing offsets, set Kafka parameter to false and follow one of the options below.

Spark Streaming checkpoints

Enabling Spark Streaming’s checkpoint is the simplest method for storing offsets, as it is readily available within Spark’s framework. Streaming checkpoints are purposely designed to save the state of the application, in our case to HDFS, so that it can be recovered upon failure.

Checkpointing the Kafka Stream will cause the offset ranges to be stored in the checkpoint. If there is a failure, the Spark Streaming application can begin reading the messages from the checkpoint offset ranges. However, Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable, especially if you are using this mechanism for a critical production application. We do not recommend managing offsets via Spark checkpoints.

Storing Offsets in HBase

HBase can be used as an external data store to preserve offset ranges in a reliable fashion. By storing offset ranges externally, it allows Spark Streaming applications the ability to restart and replay messages from any point in time as long as the messages are still alive in Kafka.

With HBase’s generic design, the application is able to leverage the row key and column structure to handle storing offset ranges across multiple Spark Streaming applications and Kafka topics within the same table. In this example, each entry written to the table can be uniquely distinguished with a row key containing the topic name, consumer group id, and the Spark Streaming batchTime.milliSeconds. Although batchTime.milliSeconds isn’t required, it does provide insight to historical batches and the offsets which were processed. New records will accumulate in the table which we have configured in the below design to automatically expire after 30 days. Below is the HBase table DDL and structure.


RowKey Layout


For each batch of messages, saveOffsets() function is used to persist last read offsets for a given kafka topic in HBase.

At the beginning of the streaming job, getLastCommittedOffsets() function is used to read the kafka topic offsets from HBase that were last processed when Spark Streaming application stopped. Function handles the following common scenarios while returning kafka topic partition offsets.

Case 1: Streaming job is started for the first time. Function queries the zookeeper to find the number of partitions in a given topic. It then returns ‘0’ as the offset for all the topic partitions.

Case 2: Long running streaming job had been stopped and new partitions are added to a kafka topic. Function queries the zookeeper to find the current number of partitions in a given topic. For all the old topic partitions, offsets are set to the latest offsets found in HBase. For all the new topic partitions, it returns ‘0’ as the offset.

Case 3: Long running streaming job had been stopped and there are no changes to the topic partitions. In this case, the latest offsets found in HBase are returned as offsets for each topic partition.

When new partitions are added to a topic once the streaming application is started, only messages from the topic partitions that were detected during the start of the streaming application are ingested. For streaming job to read the messages from newly added topic partitions, job has to be restarted.

Once we have the last committed offsets (fromOffsets in this example), we can create a Kafka Direct DStream.

After completing the processing of messages in a Kafka DStream, we can store topic partition offsets by calling saveOffsets().

You can inspect the stored offsets in HBase for various topics and consumer groups as shown below.

The code sample in this section used following version of Spark Streaming Kafka Integration










Check out this github link for the complete code sample.

Storing Offsets in ZooKeeper

Users can store offset ranges in ZooKeeper, which can similarly provide a reliable method for starting stream processing on a Kafka stream where it had last left off.

In this scenario, on start-up, the Spark Streaming job will retrieve the latest processed offsets from ZooKeeper for each topic’s partition. If a new partition is found which was not previously managed in ZooKeeper, its latest processed offset is defaulted to start from the beginning. After processing each batch, the users’ have the capability to either store the first or last offset processed. Additionally, the znode location in which the offset is stored in ZooKeeper uses the same format as the old Kafka consumer API. Therefore, any tools that are built to track or monitor Kafka offsets stored in ZooKeeper still work.

Initialize ZooKeeper connection for retrieving and storing offsets to ZooKeeper.

Method for retrieving the last offsets stored in ZooKeeper of the consumer group and topic list.

Initialization of Kafka Direct Dstream with the specific offsets to start processing from.

Method for persisting a recoverable set of offsets to ZooKeeper.

Note: The offsetPath is a ZooKeeper location represented as, /consumers/[groupId]/offsets/topic/[partitionId], that stores the value of the offset.

Kafka Itself

With Cloudera Distribution of Apache Spark 2.1.x, spark-streaming-kafka-0-10 uses the new consumer api that exposes commitAsync API. Using the commitAsync API the consumer will commit the offsets to Kafka after you know that your output has been stored. The new consumer api commits offsets back to Kafka uniquely based on the consumer’s

Persist Offsets in Kafka

Learn more about this at –

Note: commitAsync() is part of the kafka-0-10 version of Spark Streaming and Kafka Integration. As noted in Spark documentation, this integration is still experimental and API can potentially change.

Other Approaches

It is worth mentioning that you can also store offsets in a storage system like HDFS. Storing offsets in HDFS is a less popular approach compared to the above options as HDFS has a higher latency compared to other systems like ZooKeeper and HBase. Additionally, writing offsetRanges for each batch in HDFS can lead to a small files problem if not managed properly.

Not managing offsets

Managing offsets is not always a requirement for Spark Streaming applications. One example where it may not be required is when users may only need current data of the streaming application, such as a live activity monitor. In these instances where you don’t require to manage the offsets, you can either set the Kafka parameter auto.offset.reset to either largest or smallest if using the old Kafka consumer or earliest or latest if using the new Kafka consumer.

When you restart the job with auto.offset.reset set to smallest (or earliest), it will replay the whole log from the beginning (smallest offset) of your topic. With this setting all the messages that are still retained in the topic will be read. This might lead to duplicates depending on your Kafka topic retention period.

Alternatively, if you restart the Spark Streaming job with auto.offset.reset to largest (or latest), it reads the messages from latest offset of each Kafka topic partition. This might lead to loss of some messages. Depending on how critical your Spark Streaming application is and the delivery semantics it require, this might be a viable approach.


All the techniques for managing offsets that we’ve discussed are intended to help provide control of a Spark Streaming’s Direct DStream. This function allows users the ability to restore the state of the stream throughout its lifecycle, deal with unexpected failure, and improve accuracy of results continually being computed and stored.

Learn more about the Spark 2 Kafka Integration at Spark 2 Kafka Integration or Spark Streaming + Kafka Integration Guide.

References to additional information on each of the Spark 2.1.0 packages can be found at the doc spark-streaming-kafka-0-8 and spark-streaming-kafka-0-10.


28 responses on “Offset Management For Apache Kafka With Apache Spark Streaming

    1. Jordan Hambleton

      Glad you enjoyed the post boris. Figure 1 was created using OmniGraffle. cheers!

  1. swetha kasireddy

    In the state of recovering the job from the offsets stored in the zookeeper, how do you maintain the state of the job for things like updateStateByKey

    1. Jordan Hambleton

      Stateful data can be persisted in a mutable data store. I would suggest something more scalable than Zookeeper (ie. HBase can support hundreds of thousands of clients) which has a flexible schema design. Zookeeper does not require to be highly scalable for maintaining offsets as offsets are only written to by a single client, the spark driver. However, stateful data would be persisted directly from executors and could have a huge amount of data.

      1. Mário de Sá Vera

        oK Jordan, I see your point ! But what do you mean with stateful data ??? other than the OFFSET information ??? In my case , I do not want the default behaviour of considering a new release a different consumer as Spark does by default… this is very annoying actually. What other information would you consider useful for storing together with the OFFSET ?

  2. Darrel Riekhof

    I’m wondering why you would need both Kafka and Spark Streaming? If you just had data flowing through a flume channel, could you give a quick overview of why you might want to use Kafka, or Spark Streaming, or both?

    1. Jordan Hambleton

      Sure thing. Flume provides the ability to stream data to end points (ie. HDFS, HBase, Solr, …). Subsequently, the data can be read from downstream processing engines for analysis, or taking some type of action based on the results.

      Spark streaming is a continuously running process which provides a way to read the data immediately when it’s received by Kafka and can leverage Spark’s computational framework for data processing for analysis and/or take some type of action based on the results immediately.

      If you’re already using Flume, it’s possible to setup a Kafka Channel, which has benefits over file or memory channels. Further, Flume, with a Kafka Channel and HDFS Sink, can continually write data to HDFS while Spark Streaming can in parallel read from the same stream and capture insights that are time sensitive. There are large number of use-cases, feel free to checkout other posting on our blog for more. examples.

  3. Michal W

    Great Post!

    I have one question, that is not entirely clear to me.
    What does happen when I use Kafka itself to manage partitions’ offsets, but some processing will take a considerable amount of time to finish (let’s say 10s), and the batch interval is set to 5 seconds.

    Will it end up in a batch of duplicated messages?

    1. Jordan Hambleton

      Thanks for reading. The short answer is no, the messages are not duplicated per subsequent batches even when they start backing up.

      Upon a subsequent batch interval, the scheduler initializes a job with a range from the prior job’s last offset to the new offsets which are available at that time. The job is then queued and processed in the order it was materialized.

      Typically in spark streaming, the idea for a healthy processing stream is to have each batch complete consistently below the batch interval time. There scenarios where surges may occur, which there are a few different techniques in handling backpressure to throttle the incoming rate (, see “Setting the max receiving rate”) .

  4. swathi

    Excellent post,
    now its cleared all my assumptions about offset management. i am interested in using zookeeper for offset management.
    is it possible for you to post the java code for store offsets in zookeeper ?

  5. Shyla

    Great article! Thanks for taking the time to share your knowledge. I have having a problem and hope you can help me.
    I am using spark-streaming-kafka-0-10 library and setting to false. I am storing the offsets in kafka itself. I am doing exactly what you have explained in the Kafka Itself section of your article. But when I start my streaming application, my setting of to false is being overridden. I see the warning in the logs – WARN overriding to false for executor. I think my situation is what is described in Please let me know if you have a way around this. Thanks.

    1. Jordan Hambleton

      Great question, thanks for the note. This is expected as the driver, not the executors, is responsible for committing offsets when the following function is called “stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)”. When you see the “org.apache.spark.streaming.kafka010.KafkaUtils – overriding to false for executor”, it is a safety feature for consumer’s initialized on executors to not commit offsets prematurely. Lastly, do note that kafka-0-10 is marked as experimental.

      1. Shyla

        Thanks for the reply Jordan, I really appreciate. In the following code snippet, committing the offsets is done in a try ..catch block. If an exception is raised, my understanding was that the offsets won’t be committed , but that was not the case. I appreciate if you can help me understand why the kafka offsets were committed.
        stream.foreachRDD { rdd =>
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        try {…).saveToCassandra(“keyspace1”, “table1”)
        } catch {
        case ex: Exception => {
        //Log the exception …

        1. Shyla

          Hi Jordan,
          I just wanted to fix my typo. aggregatordatastream should be replaced by stream. Thanks

  6. Smart Solutions

    Hi Jordan/Guru,

    Excellent article, we tried storing the offsets using checkpoint directory and it worked fine. However, based on the requirements, our application will evolve and we will have a code change, hence there is possibility of losing the messages.
    To avoid that, we plan to store the offsets on the Kakfa itself. Based on the code snippet, on this page and Spark documentation.
    I tried following code :

    val stream:InputDStream[ConsumerRecord[String,String]] = KafkaUtil.createDirectStream(ssc, PreferConsistent, Subscribe[String,String] (Array(“topicName”),kafkaMap))

    stream.foreach { rdd =>
    val offsetRangers : Array[OffsetRanger] = rdd.asInstanceOf[HasOffsetRangers].offsetRanges

    // Filter out the values which have empty values and get the tuple of type
    // ( topicname, stringValue_read_from_kafka_topic) => (“topicName”,x.value)).filter(x=> !x._2.trim.isEmpty).foreachRDD(processRDD _)

    // Sometime later, after outputs have completed.

    def processRDD(rdd:RDD[(String,String)]) {
    // Process futher to hdfs
    When I try to start Streaming application, it does not start and looking at the logs, here is what we see :
    java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:223)
    at org.apache.spark.streaming.dstream.DStream.(DStream.scala:65)
    at org.apache.spark.streaming.dstream.MappedDStream.(MappedDStream.scala:29)

    I am not sure what I am missing here.


  7. zhoucw

    Excellent, Great article! Thanks very much. I’m from China, I have a question, can you help me ?

    The code below can guarantee Exactly-once semantics? Thanks.
    if data has been dealed, but job fails and offsets have not been saved, how to do ?

    inputDStream.foreachRDD((rdd,batchTime) => {
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetRanges.foreach(offset => println(offset.topic,offset.partition, offset.fromOffset,
    val newRDD = => processMessage(message))

  8. pascal

    Thanks for the great article!
    I decided to use the offset management described in the section “kafka itself”.
    What i expected was that stream reads messages again and again until it reaches the .commitAsync(offsetRanges). Anyway, whatever i try it keeps reading a message only once. Even if i comment out the .commitAsync(..) line the stream only processes each message once. The is set to false which is also printed in the consumer-property logging on startup.
    Are there any facts or parameters i am missing or am i expecting a wrong behavior?

    1. Paul Leclercq

      If you use a checkpoint directory, it will only read each message at-least-once. Try to delete the folder

      1. pascal

        Thanks for the hint, but i’m not using a checkpoint directiory. If I log the value of javaInputDStream.context().checkpointDir() it says null.

  9. xiaozhang

    Thanks for the great article!
    I got a good work for the first time that use hbase to process offset ,then i stopped the job and truncated the hbase table “stream_kafka_offsets”,after some days(around 20 days) reboot the job,I got the error ” Offsets out of range with no configured reset policy for partitions: {topicName-4=0}”,due to the blank hbase table ,program give the offset from 0, however looks like in kafka the offset-0 was out of range,could you please give some help to process this problem? thanks

  10. Cristi

    I have one question. You are calling the service to save the offsets (for example for HBase) in the foreachRDD function, on each streaming batch interval – let’s say every 1 second). This means that the data will get saved in HBase (overwritten) with the unchaged values of the offsets while the application is not receiving any data. I know this is not a problem from a functional point of view, but I am thinking that this is a performance killer – even if not a big one. Am I right or not?

  11. Charls Joseph

    I have a question regarding offset management when it comes to spark streaming from multiple partitions on the topic. My topic had 4 partitions and when I stream it through spark, a single ds stream batch would have multiple RDDs. Each RDD has message from 4 partitions and message from single partition is distributed across different RDD as below.
    One batch of RDD -> [RDD1 , RDD2, RDD3 ]

    RDD1 -> p1: 0-5 | p2: 0-3 | p3: 0:0
    RDD2 -> p1: 6-10 | p2: 4-10 | p3: 0:0
    RDD3 -> p1: 11-11 | p2: 11-13 | p3: 0:6

    As given, message from partition 1 is distributed across 2 RDDs ( RDD1 – Node 1 & RDD2- Node 2 ) and message from partition 2 is distributed across 3 RDDs( RDD1 – node 1, RDD2- node 2, RDD3 – node 3 ). Each RDD will processed in multiple executor(different nodes)
    In this case, managing offset would be difficult and what would be effective way of doing it ?

    When one of the node crashes, how do we track that portion of message from a single partition which went into that node? Lets say node1 crashes and looses messages from p1: 0-5 & p2:0-3 ,but other nodes processes RDD2, and RDD3 which consumes rest of the messages from p1 & p2.