Part 3: Configuring Clients
Earlier, we introduced Kafka Serializers and Deserializers that are capable of writing and reading Kafka records in Avro format. In this part we will going to see how to configure producers and consumers to use them.
Setting up a Kafka Topic for use as a Schema Store
KafkaTopicSchemaProvider
works with a Kafka topic as its persistent store. This topic will contain at most thousands of records: the schemas. It does not need multiple partitions, but it needs to be available even when one of the brokers is down. That’s why we configure it with replication factor of 3 and at least 2 in-sync replicas.
$ kafka-topics --create --topic __com_cloudera_schemaprovider --partitions 1 --replication-factor 3 --config min.insync.replicas=2 --config retention.ms=-1 --config retention.bytes=-1 --zookeeper $(hostname):2181
Of course, in a production environment, we would set up Apache Sentry rules to allow only certain principals to add schemas.
As a next step, let’s add a schema to this topic with the administration tool created in Part 2.
$ kafkatopic-schematool --add --name user --version 1 --schema-file ./user_v1_1.avsc --servers $(hostname):9092 --topic __com_cloudera_schemaprovider
Example Schema
The first version of our schema is a simplistic record that captures some attributes of a user.
{"namespace": "com.cloudera.examples.avroserialization", "type": "record", "name": "User", "fields": [ {"name": "identifier", "type": "string"}, {"name": "display_name", "type": "string"}, {"name": "registration_time", "type": "long"} ] }
The later version defines a new field and changes some types. For convenience we set the record’s name to User2 in that schema so we can generate classes for both of them in the same project. But in a real life scenario, User2 would be a later version of the same class instead of a different class coexisting with User.
Configuring the Producer
We add some general producer config:
Map<String, Object> producerProps = new HashMap<>(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ...); producerProps.put(ProducerConfig.ACKS_CONFIG, "-1"); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
then we need to configure our producer to use our Serializer:
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSpecificRecordSerializer.class.getName());
and configure the Serializer to serialize User objects:
producerProps.put(KafkaSpecificRecordSerializer.VALUE_RECORD_CLASSNAME, User.class.getName());
Next we configure the SchemaProvider
to use Kafka for schema storage and set the topic name. We also have to set bootstrap servers. This allow us to use a different Kafka cluster as the Schema-provider backend than what we are producing to.
producerProps.put(SchemaUtils.SCHEMA_PROVIDER_FACTORY_CONFIG, KafkaTopicSchemaProviderFactory.class.getName()); producerProps.put(KafkaTopicSchemaProvider.SCHEMA_PROVIDER_CONF_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER); producerProps.put(KafkaTopicSchemaProvider.SCHEMA_TOPIC_NAME_CONF, "__com_cloudera_schemaprovider");
We can now use this configuration to create a producer and produce User object to a Kafka topic.
Producer<Integer, User> producer = new KafkaProducer<>(producerProps); User u = new User("user3", "User, Third", 0L); producer.send(new ProducerRecord<>(TOPIC, 42, u)).get();
Configuring the consumer
We set up a consumer in a quite similar way. After some general configuration
Map<String, Object> consumerProps = new HashMap<>(); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER); consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "user_reader_group"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
we specify our Deserializer
and the class it will read. We use a different version of the class: User2
.
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSpecificRecordDeserializer.class.getName()); consumerProps.put(KafkaSpecificRecordDeserializer.VALUE_RECORD_CLASSNAME, User2.class.getName());
Then we need to set up the Schema provider just like we did above.
consumerProps.put(SchemaUtils.SCHEMA_PROVIDER_FACTORY_CONFIG, KafkaTopicSchemaProviderFactory.class.getName()); consumerProps.put(KafkaTopicSchemaProvider.SCHEMA_TOPIC_NAME_CONF, SCHEMAPROVIDER_TOPIC_NAME); consumerProps.put(KafkaTopicSchemaProvider.SCHEMA_PROVIDER_CONF_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER);
With this configuration, we can set up a consumer and start polling for records.
KafkaConsumer<Integer, User2> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(Collections.singletonList(TOPIC)); while(true) { consumer.poll(1000).forEach(record -> { User2 u = record.value(); System.out.println(u); }); }
Conclusion
We have shown how Avro can be used in conjunction with Kafka to track evolving versions of schema over time. In situations where the entire lifespan of data is managed by different groups, this technique provides a clean way to allow each such managing group to use schemas to manage the data evolution handoff.
The code for this blog post can be found in Cloudera’s kafka-examples Github repository.