Robust Message Serialization in Apache Kafka Using Apache Avro, Part 1

Robust Message Serialization in Apache Kafka Using Apache Avro, Part 1

In Apache Kafka, Java applications called producers write structured messages to a Kafka cluster (made up of brokers). Similarly, Java applications called consumers read these messages from the same cluster.  In some organizations, there are different groups in charge of writing and managing the producers and consumers. In such cases, one major pain point can be in the coordination of the agreed upon message format between producers and consumers.

This example demonstrates how to use Apache Avro to serialize records that are produced to Apache Kafka while allowing evolution of schemas and nonsynchronous update of producer and consumer applications.

Serialization and Deserialization

A Kafka record (formerly called message) consists of a key, a value and headers. Kafka is not aware of the structure of data in records’ key and value. It handles them as byte arrays. But systems that read records from Kafka do care about data in those records. So you need to produce data in a readable format. The data format you use should

  • Be compact
  • Be fast to encode and decode
  • Allow evolution
  • Allow upstream systems (those that write to a Kafka cluster) and downstream systems (those that read from the same Kafka cluster) to upgrade to newer schemas at different times

JSON, for example, is self explanatory but is not a compact data format and is slow to parse. Avro is a fast serialization framework that creates relatively compact output. But to read Avro records, you require the schema that the data was serialized with.

One option is to store and transfer the schema with the record itself. This is fine in a file where you store the schema once and use it for a high number of records. Storing the schema in each and every Kafka record, however, adds significant overhead in terms of storage space and network utilization. An other option is to have an agreed-upon set of identifier-schema mappings and refer to schemas by their identifiers in the record.

From Object to Kafka Record and Back

Producer applications do not need to convert data directly to byte arrays. KafkaProducer is a generic class that needs its user to specify key and value types. Then, producers accept instances of ProducerRecord that have the same type parameters. Conversion from the object to byte array is done by a Serializer. Kafka provides some primitive serializers: for example, IntegerSerializer, ByteArraySerializer, StringSerializer. On consumer side, similar Deserializers convert byte arrays to an object the application can deal with.

So it makes sense to hook in at Serializer and Deserializer level and allow developers of producer and consumer applications to use the convenient interface provided by Kafka. Although latest versions of Kafka allow ExtendedSerializers and ExtendedDeserializers to access headers, we decided to include the schema identifier in Kafka records’ key and value instead of adding record headers.

Avro Essentials

Avro is a data serialization (and remote procedure call) framework. It uses a JSON document called schema to describe data structures. Most Avro use is through either GenericRecord or subclasses of SpecificRecord. Java classes generated from Avro schemas are subclasses of the latter, while the former can be used without prior knowledge of the data structure worked with.

When two schemas satisfy a set of compatibility rules, data written with one schema (called the writer schema) can be read as if it was written with the other one (called the reader schema). Schemas have a canonical form that has all details that are irrelevant for the serialization, such as comments, stripped off to aid equivalence check.

VersionedSchema and SchemaProvider

As mentioned before, we need a one-to-one mapping between schemas and their identifiers. Sometimes it is easier to refer to schemas by names. When a compatible schema is created it can be considered a next version of the schema. Thus we can refer to schemas with a name, version pair. Let’s call the schema, its identifier, name, and version together a VersionedSchema. This object might hold additional metadata the application requires.

public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider objects can look up the instances of VersionedSchema.

public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

How this interface is implemented is covered in “Implementing a Schema Store” in a future blog post.

Serializing Generic Data

When serializing a record, we first need to figure out which Schema to use. Each record has a getSchema method. But finding out the identifier from the schema might be time consuming. It is generally more efficient to set the schema at initialization time. This may be done directly by identifier or by name and version. Furthermore, when producing to multiple topics, we might want to set different schemas for different topics and find out the schema from the topic name supplied as parameter to method serialize(T, String). This logic is omitted in our examples for the sake of brevity and simplicity.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

With the schema in hand, we need to store it in our message. Serializing the ID as part of the message gives us a compact solution, as all of the magic happens in the Serializer/Deserializer. It also enables very easy integration with other frameworks and libraries that already support Kafka and lets the user use their own serializer (such as Spark).

Using this approach, we first write the schema identifier on the first four bytes.

private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Then we can create a DatumWriter and serialize the object.

private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Putting this all together, we have implemented a generic data serializer.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Deserializing Generic Data

Deserialization can work with a single schema (the schema data was written with) but you can specify a different reader schema. The reader schema has to be compatible with the schema that the data was serialized with, but does not need to be equivalent. For this reason, we introduced schema names. We now can specify that we want to read data with specific version of a schema. At initialization time we read desired schema versions per schema name and store metadata in readerSchemasByName for quick access. Now we can read every record written with a compatible version of the schema as if it was written with the specified version.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

When a record needs to be deserialized, we first read the identifier of the writer schema. This enables looking up the reader schema by name. With both schemas available we can create a GeneralDatumReader and read the record.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Dealing with SpecificRecords

More often than not there is one class we want to use for our records. This class is then usually generated from an Avro schema. Apache Avro provides tools to generate Java code from schemas. One such tool is the Avro Maven plugin. Generated classes have the schema they were generated from available at runtime. This makes serialization and deserialization simpler and more effective. For serialization we can use the class to find out about the schema identifier to use.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Thus we do not need the logic to determine schema from topic and data. We use the schema available in the record class to write records.

Similarly, for deserialization, the reader schema can be found out from the class itself. Deserialization logic becomes simpler, because reader schema is fixed at configuration time and does not need to be looked up by schema name.

@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Additional Reading

For more information on schema compatibility, consult the Avro specification for Schema Resolution.

For more information on canonical forms, consult the Avro specification for Parsing Canonical Form for Schemas.

Next Time…

Part 2 will show an implementation of a system to store the Avro schema definitions.

 

Andras Beni
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.