Let's create a Kafka consumer that we will use to receive the custom input messages.
Now, in order to incorporate the deserializer in our consumer, there are two requisites that all of the Kafka consumers should have: to be a KafkaConsumer, and to set the specific properties, such as those in Listing 4.15.
The following is the content of Listing 4.15, the constructor method for CustomConsumer:
import kioto.HealthCheck;
import kioto.serde.HealthCheckDeserializer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.serialization.StringSerializer;
public final class CustomConsumer {
private Consumer<String, HealthCheck> consumer;
public CustomConsumer(String brokers) {
Properties props = new Properties();
props.put("group.id", "healthcheck-processor...