Robust Message Serialization in Apache Kafka Using Apache Avro, Part 2

Robust Message Serialization in Apache Kafka Using Apache Avro, Part 2

Implementing a Schema Store

In Part 1, we saw the need for an Apache Avro schema provider but did not implement one. In this part we will implement a schema provider that works with Apache Kafka as storage.

In-Memory SchemaStore

First we can implement an in-memory store for schemas. This is useful to understand the requirements for such a store and as the cache of the Kafka backed store. A SchemaStore has to be quick in looking up VersionedSchema entries. That’s why we create a separate map to help each lookup method. Using ConcurrentHashMap allows us to access these maps from multiple threads without locking.

public class InMemorySchemaStore implements SchemaStore {

  private final Map<Integer, VersionedSchema> schemasById = new
  private final Map<SchemaNameWithVersion, VersionedSchema>
      schemasByNameAndVersion = new ConcurrentHashMap<>();
  private final Map<String, VersionedSchema> schemasByParsingForm =
      new ConcurrentHashMap<>();


  public VersionedSchema getMetadata(Schema schema) {
    return schemasByParsingForm

Adding a new schema to all three maps does not need to be atomic, because we do not expect the same store to be used in different use cases.

public void add(VersionedSchema schema) {
  schemasById.put(schema.getId(), schema);
  schemasByNameAndVersion.put(new SchemaNameWithVersion(schema.getName(),
      schema.getVersion()), schema);
      .toParsingForm(schema.getSchema()), schema);

Reading from and Writing to a Kafka Topic

The other half of our Kafka based SchemaProvider is a class that can perform all the communication with Kafka. This does not have to be tied to the concept of schemas, it can be a generic piece of code. We want it to read all schemas at startup and continue polling for new schemas. For this reason, we set up the consumer like this:

  • = false, because we will re-read all schemas at startup anyway
  • Manually assign all partitions to the consumer to avoid sharing messages with another consumer accidentally having the same
  • Seek to the oldest message before reading
  • Poll until the latest record is read before allowing usage of the schema provider
  • Keep polling in a background thread to receive new schemas
class KafkaTopicStore<T> {


  public void startAndCatchUp() {
    List<PartitionInfo> partitions = consumer.partitionsFor(topic);"Start working with topic " + partitions);
    List<TopicPartition> topicPartitions =
        .map(p -> new TopicPartition(topic, p.partition()))
    Map<TopicPartition, Long> initialOffsets = consumer
        .endOffsets(topicPartitions);"Offsets to catch up with: " +  initialOffsets);
    final Set<TopicPartition> partitionsCaughtUp = new HashSet<>();
    if (initialOffsets.values().stream().mapToLong(v -> v).sum() > 0) {
      Predicate<ConsumerRecord<Void, T>> allCaughtUp = r -> {
        TopicPartition topicPartition = new
            TopicPartition(topic, r.partition());
        logger.debug("Read partition, offset: " + r.partition()
            + ",  " + r.offset());
        if (r.offset() >= initialOffsets.get(topicPartition) - 1) {
        return partitionsCaughtUp.size() == initialOffsets.size();
    pollerThread = new Thread(() -> pollUntil(r -> false), getClass().getSimpleName() + "-poller");

KafkaTopicSchemaProvider does no more than glue together InMemorySchemaStore and KafkaTopicStore. We need to configure the store to send and poll for VersionedSchemas. Also we need to make sure all new schemas are added to the cache. Whenever a new schema is read, it is added to the InMemorySchemaStore.

To ensure that we do not lose schemas over time we also need to set and retention.bytes=-1 for the topic that stores our schema information. This works with Kafka (CDK 2.0) and newer. For older brokers we can just set a “big enough” retention time.

Administering the Schema Store

We need a command line tool to add, list, and print schemas. To achieve this, we create a KafkaTopicSchemaProvider based on the command line arguments and call the appropriate methods to read or write schema information.

OptionSet options = parser.parse(args);

if (options.has(ADD)) {
  // Error handling omitted
   KafkaTopicSchemaTool tool = new KafkaTopicSchemaTool(createProvider(options));
   VersionedSchema schema = tool.addSchema(options.valueOf(NAME).toString(),
   System.out.println("Successfully added schema.");
} else if (options.has(LIST)) {
  // ...

One crucial problem is schema identifier generation. In Kafka we do not have Sequence objects like in RDBMSs. Thus, we need to make sure we come up with a unique integer for each schema we add. One simple solution is to find the next available positive integer. In our example, we do not have any way to prevent two administrators to add schemas at about the same time with the same identifier. To prevent this, we can

  • Use a Kafka topic with a single partition as a sequence: produce a single message and use its offset
  • Use an ephemeral ZooKeeper node to “lock”
  • Introduce a service to add schemas. This application can lock in memory.
  • Allow writing to the topic storing schemas to a principal that only a few have access to and delegate the responsibility

Next Timeā€¦

Part 3 will show how to make your Kafka clients aware of schema versions by accessing the schema store.


Andras Beni
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.