Writing to Kafka from Apache Beam
This recipe shows how to write to Kafka with Apache Beam.
Getting ready
To install Apache Beam, follow the instructions at:Â https://beam.apache.org/get-started/quickstart-py/.
How to do it...
The following code shows how to write a Beam pipeline to write to Kafka. The example illustrates various options for configuring the Beam sink:
PCollection<KV<Long, String>> kvColl = ...; kvColl.apply(KafkaIO.write() .withBootstrapServers("broker_1:9092, broker_2:9092") .withTopic("destination-topic") .withKeyCoder(BigEndianLongCoder.of()) .withValueCoder(StringUtf8Coder.of()) .updateProducerProperties( ImmutableMap.of("compression.type", "gzip")) );
How it works...
The KafkaIO
sink supports writing key value pairs to a Kafka topic. To configure a Kafka sink, specify the Kafka bootstrap servers and the topic to write to.
The KafakIO
allows setting most of the properties in the consumer configuration...