This post contains answers to common questions about deploying and configuring Apache Kafka as part of a Cloudera-powered enterprise data hub.
Cloudera added support for Apache Kafka, the open standard for streaming data, in February 2015 after its brief incubation period in Cloudera Labs. Apache Kafka now is an integrated part of CDH, manageable via Cloudera Manager, and we are witnessing rapid adoption of Kafka across our customer base.
As more customers adopt Apache Kafka, a common set of questions from them about development and deployment have emerged as a pattern. In this post, you’ll find answers to most of those questions.
Should I use SSDs for my Kafka Brokers?
Using SSDs instead of spinning disks has not been shown to provide a significant performance improvement for Kafka, for two main reasons:
- Kafka writes to disk are asynchronous. That is, other than at startup/shutdown, no Kafka operation waits for a disk sync to complete; disk syncs are always in the background. That’s why replicating to at least three replicas is critical—because a single replica will lose the data that has not been sync’d to disk, if it crashes.
- Each Kafka Partition is stored as a sequential write ahead log. Thus, disk reads and writes in Kafka are sequential, with very few random seeks. Sequential reads and writes are heavily optimized by modern operating systems.
How do I encrypt the data persisted on my Kafka Brokers?
Currently, Kafka does not provide any mechanism to encrypt the data persisted on the brokers (i.e. encrypting data at rest). Users can always encrypt the payload of the messages written to Kafka—that is, producers encrypt the data before writing to Kafka, and then the consumers can decrypt the received messages. But that would require the producers to share encryption protocols and keys with the Consumers.
The other option is to use software that provides filesystem-level encryption such as Cloudera Navigator Encrypt, included as part of Cloudera Enterprise, which provides a transparent encryption layer between the application and the filesystem.
Is it true that Apache ZooKeeper can become a pain point with a Kafka cluster?
Older versions of Kafka’s high-level consumer (0.8.1 and older) used ZooKeeper to maintain read offsets (the most recently read offset of each partition of a topic). If there are many consumers simultaneously reading from Kafka, the read write load on ZooKeeper may exceed its capacity, making ZooKeeper a bottleneck. However, this only occurs in extreme cases, when there are many hundreds of consumers using the same ZooKeeper cluster for offset management.
Nevertheless, this issue has been resolved in the current version of Kafka (0.8.2 at the time of this writing). Starting with version 0.8.2, the high-level consumer can use Kafka itself to manage offsets. Essentially, it uses a separate Kafka topic to manage recently read offsets, and thus Zookeeper is no longer required for offset management. However, users get to choose whether they want offsets managed in Kafka or ZooKeeper, via the consumer config parameter offsets.storage.
Cloudera highly recommends using Kafka to store offsets. However, you may choose to use ZooKeeper to store offsets for backwards compatibility. (You may, for example, have a monitoring console that reads offset information from ZooKeeper.) If you have to use ZooKeeper for offset management, we recommend using a dedicated ZooKeeper ensemble for your Kafka cluster. If a dedicated ZooKeeper ensemble is still a performance bottleneck, you can address the issue by using SSDs on your ZooKeeper nodes.
Does Kafka support cross-data center availability?
The recommended solution for cross-data center availability with Kafka is via MirrorMaker. Set up a Kafka cluster in each of your data centers, and use MirrorMaker to do near real-time replication of data between the Kafka clusters.
An architectural pattern while using MirrorMaker is to have one topic per Data-Center (DC) for each “logical” topic: For example, if you want a topic for “clicks” you’ll have “DC1.clicks” and “DC2.clicks” (where DC1 and DC2 are your data centers). DC1 will write to DC1.clicks and DC2 to DC2.clicks. MirrorMaker will replicate all DC1 topics to DC2 and all DC2 topics to DC1. Now the application on each DC will have access to events written from both DCs. It is up to the application to merge the information and handle conflicts accordingly.
Another more sophisticated pattern is to use local and aggregate Kafka clusters in each DC. This pattern is in use at LinkedIn, and has been described in detail in this blog post written by LinkedIn’s Kafka operations team. (Check out the section “Tiers and Aggregation”).
What type of data transformations are supported on Kafka?
Kafka does not enable transformation of data as it flows through Kafka. To perform data transformation, we recommend the following methods:
- For simple event by event processing, use the Flume Kafka integration, and write a simple Apache Flume Interceptor
- For complex processing, use Apache Spark Streaming to read from Kafka and process the data
In either case, the transformed/processed data can be written back to a new Kafka topic (which is useful if there are multiple downstream consumers of the transformed data), or directly delivered to the end consumer of the data.
For a more comprehensive description of real time event processing patterns, check out this blog post.
How to send large messages or payloads through Kafka?
Cloudera benchmarks indicate that Kafka reaches maximum throughput with message sizes of around 10KB. Larger messages will show decreased throughput. However, in certain cases, users need to send messages that will be much larger than 10K.
If the message payload sizes will be in the order of 100s of MB, we recommend exploring the following alternatives:
- If shared storage is available (HDFS, S3, NAS), place the large payload on shared storage and use Kafka just to send a message with the payload location.
- Handle large messages by chopping them into smaller parts before writing into Kafka, using a message key to make sure all the parts are written to the same partition so that they are consumed by the same Consumer, and re-assembling the large message from its parts when consuming.
While sending large messages through Kafka, keep the following in mind:
Compression configs
- Kafka Producers can compress messages. Ensure compression is turned on via the config parameter
compression.codec
. Valid options are “gzip” and “snappy”.
Broker configs
message.max.bytes
(default:1000000): maximum size of a message the broker will accept. Increase this value to accommodate your largest message.log.segment.bytes
(default: 1GB): size of a Kafka data file. Make sure it’s larger than one message. Default should be fine since large messages should not exceed 1GB in size.replica.fetch.max.bytes
(default: 1MB): maximum size of data that a broker can replicate. This has to be larger than message.max.bytes, or a broker will accept messages and fail to replicate them, leading to potential data loss.
Consumer configs
fetch.message.max.bytes
(default 1MB) – Maximum size of message a consumer can read. This should be greater than or equal to message.max.bytes configuration on the broker.
A few other considerations:
- Brokers will need to allocate a buffer of size replica.fetch.max.bytes for each partition they replicate. Do the math and make sure the number of partitions * the size of the largest message does not exceed available memory, or you’ll see OOMs.
- Same for consumers and fetch.message.max.bytes: Confirm there’s enough memory for the largest message for each partition the consumer reads.
- Large messages may cause longer garbage collection pauses (as brokers need to allocate large chunks). Keep an eye on the GC log and on the server log. If long GC pauses cause Kafka to lose the ZooKeeper session, you may need to configure longer timeout values for zookeeper.session.timeout.ms.
Does Kafka support the MQTT or JMS protocols?
Currently, Kafka does not provide out-of-the-box support for the above protocols. However, users have been known to write adaptors to read data from MQTT or JMS and write to Kafka.
For further information on running Kafka with CDH, download the Deployment Guide or watch the webinar “Bringing Real-Time Data to Hadoop.”
If you have additional questions, please feel to free to post them on our community forum.
Anand Iyer is a senior product manager at Cloudera. His primary areas of focus are platforms for real-time streaming, Apache Spark, and tools for data ingestion into the Hadoop platform. Before joining Cloudera, he worked as an engineer at LinkedIn, where he applied machine learning techniques to improve the relevance and personalization of LinkedIn’s Feed.
Gwen Shapira is a Software Engineer at Cloudera, and a committer on Apache Sqoop and Apache Kafka. She has 15 years of experience working with customers to design scalable data architectures.