In today’s tutorial we are going to discuss how to work with Apache Kafka Streams efficiently.
In the data world, a stream is linked to the most important abstractions. A stream depicts a continuously updating and unbounded process. Here, unbounded means unlimited size. By definition, a stream is a fault-tolerant, replayable, and ordered sequence of immutable data records. A data record is defined as a key-value pair. Before we proceed, some concepts need to be defined:
There are two ways to define a topology:
This recipe sets the project to use Kafka streams in the Treu application project.
The project generated in the first four chapters is needed.
apply plugin: 'java'
apply plugin: 'application'
sourceCompatibility = '1.8'
mainClassName = 'treu.StreamingApp'
repositories {
mavenCentral()
}
version = '0.1.0'
dependencies {
compile 'org.apache.kafka:kafka-clients:1.0.0'
compile 'org.apache.kafka:kafka-streams:1.0.0'
compile 'org.apache.avro:avro:1.7.7'
}
jar {
manifest {
attributes 'Main-Class': mainClassName
}
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it)
}
} {
exclude "META-INF/*.SF"
exclude "META-INF/*.DSA"
exclude "META-INF/*.RSA"
}
}
$ gradle jar
The output is something like:
...
BUILD SUCCESSFUL
Total time: 24.234 secs
package treu;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class StreamingApp {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG,
"streaming_app_id");// 1
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092"); //2
StreamsConfig config = new StreamsConfig(props); // 3
StreamsBuilder builder = new StreamsBuilder(); //4
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
KStream<String, String> simpleFirstStream =
builder.stream("src-topic"); //5
KStream<String, String> upperCasedStream =
simpleFirstStream.mapValues(String::toUpperCase); //6
upperCasedStream.to("out-topic"); //7
System.out.println("Streaming App Started");
streams.start();
Thread.sleep(30000); //8
System.out.println("Shutting down the Streaming App");
streams.close();
}
}
Follow the comments in the code:
In line //1, the APPLICATION_ID_CONFIG is an identifier for the app inside the broker
In line //2, the BOOTSTRAP_SERVERS_CONFIG specifies the broker to use
In line //3, the StreamsConfig object is created, it is built with the properties specified
In line //4, the StreamsBuilder object is created, it is used to build a topology
In line //5, when KStream is created, the input topic is specified
In line //6, another KStream is created with the contents of the src-topic but in uppercase
In line //7, the uppercase stream should write the output to out-topic
In line //8, the application will run for 30 seconds
In the previous recipe, the first version of the streaming app was coded. Now, in this recipe, everything is compiled and executed.
The execution of the previous recipe of this chapter is needed.
The streaming app doesn't receive arguments from the command line:
$ gradle jar
If everything is OK, the output should be:
...
BUILD SUCCESSFUL
Total time: …
$ <confluent-path>/bin/confluent start
$ bin/kafka-topics --create --topic src-topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1 $ bin/kafka-topics --create --topic out-topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
$ bin/kafka-console-producer --broker-list localhost:9092 --topic src-topic
This window is where the input messages are typed.
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 -- from-beginning --topic out-topic
$ java -jar ./build/libs/treu-0.1.0.jar localhost:9092
$> Hello [Enter]
$> Kafka [Enter]
$> Streams [Enter]
> HELLO
> KAFKA
> STREAMS
We discussed about the Apache Kafka streams and how to get up and running with it.
If you liked this post, be sure to check out Apache Kafka 1.0 Cookbook which consists of more useful recipes to work with Apache Kafka installation.