Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon

Working with Kafka Streams

Save for later
  • 6 min read
  • 22 Feb 2018

article-image
This article is a book excerpt from Apache Kafka 1.0 Cookbook written by Raúl Estrada. This book will simplify real-time data processing by leveraging Apache Kafka 1.0.

In today’s tutorial we are going to discuss how to work with Apache Kafka Streams efficiently.

In the data world, a stream is linked to the most important abstractions. A stream depicts a continuously updating and unbounded process. Here, unbounded means unlimited size. By definition, a stream is a fault-tolerant, replayable, and ordered sequence of immutable data records. A data record is defined as a key-value pair. Before we proceed, some concepts need to be defined:

  • Stream processing application: Any program that utilizes the Kafka streams library is known as a stream processing application.
  • Processor topology: This is a topology that defines the computational logic of the data processing that a stream processing application requires to be performed. A topology is a graph of stream processors (nodes) connected by streams (edges). 

There are two ways to define a topology:

  • Via the low-level processor API
  • Via the Kafka streams DSL
  • Stream processor: This is a node present in the processor topology. It represents a processing step in a topology and is used to transform data in streams. The standard operations—filter, join, map, and aggregations—are examples of stream processors available in Kafka streams.
  • Windowing: Sometimes, data records are divided into time buckets by a stream processor to window the stream by time. This is usually required for aggregation and join operations.
  • Join: When two or more streams are merged based on the keys of their data records, a new stream is generated. The operation that generates this new stream is called a join. A join over record streams is usually required to be performed on a windowing basis.
  • Aggregation: A new stream is generated by combining multiple input records into a single output record, by taking one input stream. The operation that creates this new stream is known as aggregation. Examples of aggregations are sums and counts.

Setting up the project

This recipe sets the project to use Kafka streams in the Treu application project.

Getting ready

The project generated in the first four chapters is needed.

How to do it

  1. Open the build.gradle file on the Treu project generated in Chapter 4, Message Enrichment, and add these lines:
apply plugin: 'java'

apply plugin: 'application'

sourceCompatibility = '1.8'

mainClassName = 'treu.StreamingApp'

repositories {

mavenCentral()



}

version = '0.1.0'

dependencies {

compile 'org.apache.kafka:kafka-clients:1.0.0'

compile 'org.apache.kafka:kafka-streams:1.0.0'

compile 'org.apache.avro:avro:1.7.7'

}

jar {

manifest {

attributes 'Main-Class': mainClassName

}

from {

configurations.compile.collect {

it.isDirectory() ? it : zipTree(it)

}

} {

exclude "META-INF/*.SF"

exclude "META-INF/*.DSA"

exclude "META-INF/*.RSA"

}

}
  1. To rebuild the app, from the project root directory, run this command:
$ gradle jar

The output is something like:

...

BUILD SUCCESSFUL

Total time: 24.234 secs
  1. As the next step, create a file called StreamingApp.java in the src/main/java/treu directory with the following contents:
package treu;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsConfig;



import org.apache.kafka.streams.kstream.KStream;

import java.util.Properties;

public class StreamingApp {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG,

"streaming_app_id");// 1

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,

"localhost:9092"); //2

StreamsConfig config = new StreamsConfig(props); // 3

StreamsBuilder builder = new StreamsBuilder(); //4

Topology topology = builder.build();

KafkaStreams streams = new KafkaStreams(topology, config);

KStream<String, String> simpleFirstStream =

builder.stream("src-topic"); //5

KStream<String, String> upperCasedStream =

simpleFirstStream.mapValues(String::toUpperCase); //6

upperCasedStream.to("out-topic"); //7

System.out.println("Streaming App Started");

streams.start();

Thread.sleep(30000); //8

System.out.println("Shutting down the Streaming App");

streams.close();

}

}

How it works

Follow the comments in the code:

In line //1, the APPLICATION_ID_CONFIG is an identifier for the app inside the broker

In line //2, the BOOTSTRAP_SERVERS_CONFIG specifies the broker to use

In line //3, the StreamsConfig object is created, it is built with the properties specified

In line //4, the StreamsBuilder object is created, it is used to build a topology

In line //5, when KStream is created, the input topic is specified

In line //6, another KStream is created with the contents of the src-topic but in uppercase

In line //7, the uppercase stream should write the output to out-topic

In line //8, the application will run for 30 seconds

Running the streaming application

In the previous recipe, the first version of the streaming app was coded. Now, in this recipe, everything is compiled and executed.

Getting ready

The execution of the previous recipe of this chapter is needed.

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at €18.99/month. Cancel anytime

How to do it

The streaming app doesn't receive arguments from the command line:

  1. To build the project, from the treu directory, run the following command:
$ gradle jar

If everything is OK, the output should be:

...

BUILD SUCCESSFUL

Total time: …
  1. To run the project, we have four different command-line windows. The following diagram shows what the arrangement of command-line windows should look like:

working-with-kafka-streams-img-0

  1. In the first command-line Terminal, run the control center:
$ <confluent-path>/bin/confluent start
  1. In the second command-line Terminal, create the two topics needed:
$ bin/kafka-topics --create --topic src-topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1 $ bin/kafka-topics --create --topic out-topic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
  1. In that command-line Terminal, start the producer:
$ bin/kafka-console-producer --broker-list localhost:9092 --topic src-topic

This window is where the input messages are typed.

  1. In the third command-line Terminal, start a consumer script listening to outtopic:
$ bin/kafka-console-consumer --bootstrap-server localhost:9092 -- from-beginning --topic out-topic
  1. In the fourth command-line Terminal, start up the processing application. Go the project root directory (where the Gradle jar command was executed) and run:
$ java -jar ./build/libs/treu-0.1.0.jar localhost:9092
  1. Go to the second command-line Terminal (console-producer) and send the following three messages (remember to press Enter between messages and execute each one in just one line):
$> Hello [Enter]

$> Kafka [Enter]

$> Streams [Enter]
  1. The messages typed in console-producer should appear uppercase in the outtopic console consumer window:
> HELLO

> KAFKA

> STREAMS

We discussed about the Apache Kafka streams and how to get up and running with it.

If you liked this post, be sure to check out Apache Kafka 1.0 Cookbook which consists of more useful recipes to work with Apache Kafka installation.

working-with-kafka-streams-img-1