Robust Message Serialization in Apache Kafka Using Apache Avro, Part 3

Robust Message Serialization in Apache Kafka Using Apache Avro, Part 3

Part 3: Configuring Clients

Earlier, we introduced Kafka Serializers and Deserializers that are capable of writing and reading Kafka records in Avro format. In this part we will going to see how to configure producers and consumers to use them.

Setting up a Kafka Topic for use as a Schema Store

KafkaTopicSchemaProvider works with a Kafka topic as its persistent store. This topic will contain at most thousands of records: the schemas. It does not need multiple partitions, but it needs to be available even when one of the brokers is down. That’s why we configure it with replication factor of 3 and at least 2 in-sync replicas.

$ kafka-topics --create --topic __com_cloudera_schemaprovider 
      --partitions 1 --replication-factor 3 --config min.insync.replicas=2 
      --config retention.ms=-1 --config retention.bytes=-1 
      --zookeeper $(hostname):2181

Of course, in a production environment, we would set up Apache Sentry rules to allow only certain principals to add schemas.

As a next step, let’s add a schema to this topic with the administration tool created in Part 2.

$ kafkatopic-schematool --add --name user --version 1 
      --schema-file ./user_v1_1.avsc --servers $(hostname):9092 
      --topic __com_cloudera_schemaprovider


Example Schema

The first version of our schema is a simplistic record that captures some attributes of a user.

{"namespace": "com.cloudera.examples.avroserialization",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "identifier", "type": "string"},
    {"name": "display_name", "type": "string"},
    {"name": "registration_time", "type": "long"}
  ]
}

The later version defines a new field and changes some types. For convenience we set the record’s name to User2 in that schema so we can generate classes for both of them in the same project. But in a real life scenario, User2 would be a later version of the same class instead of a different class coexisting with User.

Configuring the Producer

We add some general producer config:

Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    IntegerSerializer.class.getName());


then we need to configure our producer to use our Serializer:

producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    KafkaSpecificRecordSerializer.class.getName());

and configure the Serializer to serialize User objects:

producerProps.put(KafkaSpecificRecordSerializer.VALUE_RECORD_CLASSNAME,
    User.class.getName());

Next we configure the SchemaProvider to use Kafka for schema storage and set the topic name. We also have to set bootstrap servers. This allow us to use a different Kafka cluster as the Schema-provider backend than what we are producing to.

producerProps.put(SchemaUtils.SCHEMA_PROVIDER_FACTORY_CONFIG,
    KafkaTopicSchemaProviderFactory.class.getName());
producerProps.put(KafkaTopicSchemaProvider.SCHEMA_PROVIDER_CONF_PREFIX +
    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER);
producerProps.put(KafkaTopicSchemaProvider.SCHEMA_TOPIC_NAME_CONF,
    "__com_cloudera_schemaprovider");


We can now use this configuration to create a producer and produce
User object to a Kafka topic.

Producer<Integer, User> producer = new KafkaProducer<>(producerProps);
User u = new User("user3", "User, Third", 0L);
producer.send(new ProducerRecord<>(TOPIC, 42, u)).get();


Configuring the consumer

We set up a consumer in a quite similar way. After some general configuration

Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "user_reader_group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
    IntegerDeserializer.class.getName());

we specify our Deserializer and the class it will read. We use a different version of the class: User2.

consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
    KafkaSpecificRecordDeserializer.class.getName());
consumerProps.put(KafkaSpecificRecordDeserializer.VALUE_RECORD_CLASSNAME,
    User2.class.getName());


Then we need to set up the Schema provider just like we did above.

consumerProps.put(SchemaUtils.SCHEMA_PROVIDER_FACTORY_CONFIG, KafkaTopicSchemaProviderFactory.class.getName());
consumerProps.put(KafkaTopicSchemaProvider.SCHEMA_TOPIC_NAME_CONF,
    SCHEMAPROVIDER_TOPIC_NAME);
consumerProps.put(KafkaTopicSchemaProvider.SCHEMA_PROVIDER_CONF_PREFIX +
    CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER);


With this configuration, we can set up a consumer and start polling for records.

KafkaConsumer<Integer, User2> consumer = new KafkaConsumer<>(consumerProps);

consumer.subscribe(Collections.singletonList(TOPIC));
while(true) {
  consumer.poll(1000).forEach(record -> {
  User2 u = record.value();
  System.out.println(u);
  });
}

Conclusion

We have shown how Avro can be used in conjunction with Kafka to track evolving versions of schema over time.  In situations where the entire lifespan of data is managed by different groups, this technique provides a clean way to allow each such managing group to use schemas to manage the data evolution handoff.

The code for this blog post can be found in Cloudera’s kafka-examples Github repository.

 

Andras Beni
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.