[box type="note" align="" class="" width=""]This article is an excerpt from a book written by Rajanarayanan Thottuvaikkatumana titled Apache Spark 2 for Beginners. This book is a developer’s guide for developing large-scale and distributed data processing applications in their business environment.
[/box]
Data processing is generally carried in two ways, either in batch or stream processing. This article will help you learn how to start processing your data uninterruptedly and build fault-tolerance as and when the data gets generated in real-time
Message queueing systems with publish-subscribe capability are generally used for processing messages. The traditional message queueing systems failed to perform because of the huge volume of messages to be processed per second for the needs of large-scale data processing applications.
Kafka is a publish-subscribe messaging system used by many IoT applications to process a huge number of messages. The following capabilities of Kafka made it one of the most widely used messaging systems:
The following are some of the important elements of Kafka, and are terms to be understood before proceeding further:
The same log event processing application use case discussed in the preceding section is used again here to elucidate the usage of Kafka with Spark Streaming. Instead of collecting the log event messages from the TCP socket, here the Spark Streaming data processing application will act as a consumer of a Kafka topic and the messages published to the topic will be consumed.
The Spark Streaming data processing application uses the version 0.8.2.2 of Kafka as the message broker, and the assumption is that the reader has already installed Kafka, at least in a standalone mode. The following activities are to be performed to make sure that Kafka is ready to process the messages produced by the producers and that the Spark Streaming data processing application can consume those messages:
The following scripts are run from separate terminal windows in order to start Zookeeper and the Kafka broker, and to create the required Kafka topics:
$ cd $KAFKA_HOME
$ $KAFKA_HOME/bin/zookeeper-server-start.sh
$KAFKA_HOME/config/zookeeper.properties
[2016-07-24 09:01:30,196] INFO binding to port 0.0.0.0/0.0.0.0:2181
(org.apache.zookeeper.server.NIOServerCnxnFactory)
$ $KAFKA_HOME/bin/kafka-server-start.sh
$KAFKA_HOME/config/server.properties
[2016-07-24 09:05:06,381] INFO 0 successfully elected as leader
(kafka.server.ZookeeperLeaderElector)
[2016-07-24 09:05:06,455] INFO [Kafka Server 0], started
(kafka.server.KafkaServer)
$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --
replication-factor 1 --partitions 1 --topic sfb
Created topic "sfb".
$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --
topic sfb
The Kafka message producer can be any application capable of publishing messages to the Kafka topics. Here, the kafka-console-producer that comes with Kafka is used as the producer of choice. Once the producer starts running, whatever is typed into its console window will be treated as a message that is published to the chosen Kafka topic. The Kafka topic is given as a command line argument when starting the kafka-console-producer.
The submission of the Spark Streaming data processing application that consumes log event messages produced by the Kafka producer is slightly different from the application covered in the preceding section. Here, many Kafka jar files are required for the data processing. Since they are not part of the Spark infrastructure, they have to be submitted to the Spark cluster. The following jar files are required for the successful running of this application:
In the preceding list of jar files, the maven repository co-ordinate for spark-streamingkafka-0-8_2.11-2.0.0-preview.jar is "org.apache.spark" %% "sparkstreaming-kafka-0-8" % "2.0.0-preview". This particular jar file has to be downloaded and placed in the lib folder of the directory structure given in Figure 4. It is being used in the submit.sh and the submitPy.sh scripts, which submit the application to the Spark cluster. The download URL for this jar file is given in the reference section of this chapter.
In the submit.sh and submitPy.sh files, the last few lines contain a conditional statement looking for the second parameter value of 1 to identify this application and ship the required jar files to the Spark cluster.
The following code snippet is the Scala code for the log event processing application that processes the messages produced by the Kafka producer. The use case of this application is the same as the one discussed in the preceding section concerning windowing operations:
/**
The following program can be compiled and run using SBT Wrapper scripts have been provided with this
The following script can be run to compile the code ./compile.sh
The following script can be used to run this application in Spark. The second command line argument of value 1 is very important. This is to flag the shipping of the kafka jar files to the Spark cluster ./submit.sh com.packtpub.sfb.KafkaStreamingApps 1
**/
package com.packtpub.sfb
import java.util.HashMap
import org.apache.spark.streaming._
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.kafka._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,
ProducerRecord}
object KafkaStreamingApps {
def main(args: Array[String]) {
// Log level settings
LogSettings.setLogLevels()
// Variables used for creating the Kafka stream
//The quorum of Zookeeper hosts
val zooKeeperQuorum = "localhost"
// Message group name
val messageGroup = "sfb-consumer-group"
//Kafka topics list separated by coma if there are multiple topics to be
listened on
val topics = "sfb"
//Number of threads per topic
val numThreads = 1
// Create the Spark Session and the spark context
val spark = SparkSession
.builder
.appName(getClass.getSimpleName)
.getOrCreate()
// Get the Spark context from the Spark session for creating the
streaming context
val sc = spark.sparkContext
// Create the streaming context
val ssc = new StreamingContext(sc, Seconds(10))
// Set the check point directory for saving the data to recover when
there is a crash
ssc.checkpoint("/tmp")
// Create the map of topic names
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Create the Kafka stream
val appLogLines = KafkaUtils.createStream(ssc, zooKeeperQuorum,
messageGroup, topicMap).map(_._2)
// Count each log messge line containing the word ERROR
val errorLines = appLogLines.filter(line => line.contains("ERROR"))
// Print the line containing the error
errorLines.print()
// Count the number of messages by the windows and print them
errorLines.countByWindow(Seconds(30), Seconds(10)).print()
// Start the streaming
ssc.start()
// Wait till the application is terminated
ssc.awaitTermination()
}
}
Compared to the Scala code in the preceding section, the major difference is in the way the stream is created.
The following code snippet is the Python code for the log event processing application that processes the message produced by the Kafka producer. The use case of this application is also the same as the one discussed in the preceding section concerning windowing operations:
# The following script can be used to run this application in Spark
# ./submitPy.sh KafkaStreamingApps.py 1
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
# Create the Spark context
sc = SparkContext(appName="PythonStreamingApp")
# Necessary log4j logging level settings are done
log4j = sc._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.WARN)
# Create the Spark Streaming Context with 10 seconds batch interval
ssc = StreamingContext(sc, 10)
# Set the check point directory for saving the data to recover when
there is a crash
ssc.checkpoint("tmp")
# The quorum of Zookeeper hosts
zooKeeperQuorum="localhost"
# Message group name
messageGroup="sfb-consumer-group"
# Kafka topics list separated by coma if there are multiple topics to
be listened on
topics = "sfb"
# Number of threads per topic
numThreads = 1
# Create a Kafka DStream
kafkaStream = KafkaUtils.createStream(ssc, zooKeeperQuorum,
messageGroup, {topics: numThreads})
# Create the Kafka stream
appLogLines = kafkaStream.map(lambda x: x[1])
# Count each log messge line containing the word ERROR
errorLines = appLogLines.filter(lambda appLogLine: "ERROR" in
appLogLine)
# Print the first ten elements of each RDD generated in this DStream to
the console
errorLines.pprint()
errorLines.countByWindow(30,10).pprint()
# Start the streaming
ssc.start()
# Wait till the application is terminated
ssc.awaitTermination()
The following commands are run on the terminal window to run the Scala application:
$ cd Scala
$ ./submit.sh com.packtpub.sfb.KafkaStreamingApps 1
The following commands are run on the terminal window to run the Python application:
$ cd Python
$
./submitPy.sh KafkaStreamingApps.py 1
When both of the preceding programs are running, whatever log event messages are typed into the console window of the Kafka console producer, and invoked using the following command and inputs, will be processed by the application. The outputs of this program will be very similar to the ones that are given in the preceding section:
$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --
topic sfb
[Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /home/raj/
[Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index
forbidden by rule: /apache/web/test
Spark provides two approaches to process Kafka streams. The first one is the receiver-based approach that was discussed previously and the second one is the direct approach.
This direct approach to processing Kafka messages is a simplified method in which Spark Streaming is using all the possible capabilities of Kafka just like any of the Kafka topic consumers, and polls for the messages in the specific topic, and the partition by the offset number of the messages. Depending on the batch interval of the Spark Streaming data processing application, it picks up a certain number of offsets from the Kafka cluster, and this range of offsets is processed as a batch. This is highly efficient and ideal for processing messages with a requirement to have exactly-once processing. This method also reduces the Spark Streaming library's need to do additional work to implement the exactly-once semantics of the message processing and delegates that responsibility to Kafka. The programming constructs of this approach are slightly different in the APIs used for the data processing. Consult the appropriate reference material for the details.
The preceding sections introduced the concept of a Spark Streaming library and discussed some of the real-world use cases. There is a big difference between Spark data processing applications developed to process static batch data and those developed to process dynamic stream data in a deployment perspective. The availability of data processing applications to process a stream of data must be constant. In other words, such applications should not have components that are single points of failure. The following section is going to discuss this topic.
When a Spark Streaming application is processing the incoming data, it is very important to have uninterrupted data processing capability so that all the data that is getting ingested is processed. In business-critical streaming applications, most of the time missing even one piece of data can have a huge business impact. To deal with such situations, it is important to avoid single points of failure in the application infrastructure. From a Spark Streaming application perspective, it is good to understand how the underlying components in the ecosystem are laid out so that the appropriate measures can be taken to avoid single points of failure.
A Spark Streaming application deployed in a cluster such as Hadoop YARN, Mesos or Spark Standalone mode has two main components very similar to any other type of Spark application:
But the executors have an additional component called a receiver that receives the data getting ingested as a stream and saves it as blocks of data in memory. When one receiver is receiving the data and forming the data blocks, they are replicated to another executor for fault-tolerance. In other words, in-memory replication of the data blocks is done onto a different executor. At the end of every batch interval, these data blocks are combined to form a DStream and sent out for further processing downstream.
Figure 1 depicts the components working together in a Spark Streaming application infrastructure deployed in a cluster:
In Figure 1, there are two executors. The receiver component is deliberately not displayed in the second executor to show that it is not using the receiver and instead just collects the replicated data blocks from the other executor. But when needed, such as on the failure of the first executor, the receiver in the second executor can start functioning.
Spark Streaming data processing application infrastructure has many moving parts. Failures can happen to any one of them, resulting in the interruption of the data processing. Typically failures can happen to the Spark driver or the executors.
When an executor fails, since the replication of data is happening on a regular basis, the task of receiving the data stream will be taken over by the executor on which the data was getting replicated. There is a situation in which when an executor fails, all the data that is unprocessed will be lost. To circumvent this problem, there is a way to persist the data blocks into HDFS or Amazon S3 in the form of write-ahead logs.
When the Spark driver fails, the driven program is stopped, all the executors lose connection, and they stop functioning. This is the most dangerous situation. To deal with this situation, some configuration and code changes are necessary.
The Spark driver has to be configured to have an automatic driver restart, which is supported by the cluster managers. This includes a change in the Spark job submission method to have the cluster mode in whichever may be the cluster manager. When a restart of the driver happens, to start from the place when it crashed, a checkpointing mechanism has to be implemented in the driver program. This has already been done in the code samples that are used. The following lines of code do that job:
ssc = StreamingContext(sc, 10)
ssc.checkpoint("tmp")
From an application coding perspective, the way the StreamingContext
is created is slightly different. Instead of creating a new StreamingContext
every time, the factory method getOrCreate
of the StreamingContext
is to be used with a function, as shown in the following code segment. If that is done, when the driver is restarted, the factory method will check the checkpoint directory to see whether an earlier StreamingContext
was in use, and, if found in the checkpoint data, it is created. Otherwise, a new StreamingContext
is created.
The following code snippet gives the definition of a function that can be used with the getOrCreate
factory method of the StreamingContext
. As mentioned earlier, a detailed treatment of these aspects is beyond the scope of this book:
/**
* The following function has to be used when the code is being
restructured to have checkpointing and driver recovery
* The way it should be used is to use the StreamingContext.getOrCreate
with this function and do a start of that
*/
def sscCreateFn(): StreamingContext = {
// Variables used for creating the Kafka stream
// The quorum of Zookeeper hosts
val zooKeeperQuorum = "localhost"
// Message group name
val messageGroup = "sfb-consumer-group"
//Kafka topics list separated by coma if there are multiple topics to be
listened on
val topics = "sfb"
//Number of threads per topic
val numThreads = 1
// Create the Spark Session and the spark context
val spark = SparkSession
.builder
.appName(getClass.getSimpleName)
.getOrCreate()
// Get the Spark context from the Spark session for creating the
streaming context
val sc = spark.sparkContext
// Create the streaming context
val ssc = new StreamingContext(sc, Seconds(10))
// Create the map of topic names
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Create the Kafka stream
val appLogLines = KafkaUtils.createStream(ssc, zooKeeperQuorum,
messageGroup, topicMap).map(_._2)
// Count each log messge line containing the word ERROR
val errorLines = appLogLines.filter(line => line.contains("ERROR"))
// Print the line containing the error
errorLines.print()
// Count the number of messages by the windows and print them
errorLines.countByWindow(Seconds(30), Seconds(10)).print()
// Set the check point directory for saving the data to recover when
there is a crash
ssc.checkpoint("/tmp")
// Return the streaming context
ssc
}
At a data source level, it is a good idea to build parallelism for faster data processing and, depending on the source of data, this can be accomplished in different ways. Kafka inherently supports partition at the topic level, and that kind of scaling out mechanism supports a good amount of parallelism. As a consumer of Kafka topics, the Spark Streaming data processing application can have multiple receivers by creating multiple streams, and the data generated by those streams can be combined by the union operation on the Kafka streams.
The production deployment of Spark Streaming data processing applications is to be done purely based on the type of application that is being used. Some of the guidelines given previously are just introductory and conceptual in nature. There is no silver bullet approach to solving production deployment problems, and they have to evolve along with the application development.
To summarize, we looked at the production deployment of Spark Streaming data processing applications and the possible ways of implementing fault-tolerance in Spark Streaming and data processing applications using Kafka.
To explore more critical and equally important Spark tools such as Spark GraphX, Spark MLlib, DataFrames etc, do check out Apache Spark 2 for Beginners to develop efficient large-scale applications with Apache Spark.