Now, let's solve the problem of counting how many events are in each window. For this, we will use Kafka Streams. When we do this type of analysis, it is said that we are doing streaming aggregation.
In the src/main/java/kioto/events directory, create a file called EventProcessor.java with the contents of Listing 6.6, shown as follows:
package kioto.events;
import ...
public final class EventProcessor {
private final String brokers;
private EventProcessor(String brokers) {
this.brokers = brokers;
}
private void process() {
// ...
}
public static void main(String[] args) {
(new EventProcessor("localhost:9092")).process();
}
}
Listing 6.6: EventProcessor.java
All the processing logic is contained in the process() method. The first step is to create a StreamsBuilder to create the KStream, shown as follows:
StreamsBuilder streamsBuilder...