Now, in the src/main/java/kioto/spark directory, create a file called SparkProcessor.java with the contents of Listing 8.2, shown as follows:
package kioto.spark;
import kioto.Constants;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.*;
import org.apache.spark.sql.types.*;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.Period;
public class SparkProcessor {
private String brokers;
public SparkProcessor(String brokers) {
this.brokers = brokers;
}
public final void process() {
//below is the content of this method
}
public static void main(String[] args) {
(new SparkProcessor("localhost:9092")).process();
}
}
Listing 8.2: SparkProcessor.java
Note that, as in previous examples, the main method invoked the process() method with the IP address and the port of the Kafka brokers.
Now, let's...