Building a Kafka consumer with Akka
In this recipe, a Kafka consumer is built with Akka.
Getting ready
The Akka connector is available at Maven Central for Scala 2.11 at the following Maven coordinates:
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.11-M4"
How to do it...
When creating a consumer stream, specify the ProducerSettings
defining the following:
- Kafka cluster bootstrap server
- Serializers for the keys and values
- Tuning parameters
The imports necessary for ConsumerSettings
are as follows:
import akka.kafka._ import akka.kafka.scaladsl._ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.clients.consumer.ConsumerConfig
To declare and define the ConsumerSettings
:
val consumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer) .withBootstrapServers("localhost:9092") .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG...