Let's create a Kafka AvroConsumer that we will use to receive the input records. As we already know, there are two prerequisites that all the Kafka Consumers should have: to be a KafkaConsumer and to set specific properties, such as in Listing 5.5:
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringSerializer;
public final class AvroConsumer {
private Consumer<String, GenericRecord> consumer; //1
public AvroConsumer(String brokers, String schemaRegistryUrl) { //2
Properties props = new Properties();
props.put("group.id", "healthcheck-processor");
props.put("bootstrap.servers", brokers);
props.put...