Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Implementing fault-tolerance in Spark Streaming data processing applications with Apache Kafka

Save for later
  • 16 min read
  • 01 Feb 2018

article-image

[box type="note" align="" class="" width=""]This article is an excerpt from a book written by Rajanarayanan Thottuvaikkatumana titled Apache Spark 2 for Beginners. This book is a developer’s guide for developing large-scale and distributed data processing applications in their business environment.
[/box]

Data processing is generally carried in two ways, either in batch or stream processing. This article will help you learn how to start processing your data uninterruptedly and build fault-tolerance as and when the data gets generated in real-time

Message queueing systems with publish-subscribe capability are generally used for processing messages. The traditional message queueing systems failed to perform because of the huge volume of messages to be processed per second for the needs of large-scale data processing applications.

Kafka is a publish-subscribe messaging system used by many IoT applications to process a huge number of messages. The following capabilities of Kafka made it one of the most widely used messaging systems:

  • Extremely fast: Kafka can process huge amounts of data by handling reading and writing in short intervals of time from many application clients
  • Highly scalable: Kafka is designed to scale up and scale out to form a cluster using commodity hardware
  • Persists a huge number of messages: Messages reaching Kafka topics are persisted into the secondary storage, while at the same time it is handling huge number of messages flowing through

The following are some of the important elements of Kafka, and are terms to be understood before proceeding further:

  • Producer: The real source of the messages, such as weather sensors or mobile phone network
  • Broker: The Kafka cluster, which receives and persists the messages published to its topics by various producers
  • Consumer: The data processing applications subscribed to the Kafka topics that consume the messages published to the topics

The same log event processing application use case discussed in the preceding section is used again here to elucidate the usage of Kafka with Spark Streaming. Instead of collecting the log event messages from the TCP socket, here the Spark Streaming data processing application will act as a consumer of a Kafka topic and the messages published to the topic will be consumed.

The Spark Streaming data processing application uses the version 0.8.2.2 of Kafka as the message broker, and the assumption is that the reader has already installed Kafka, at least in a standalone mode. The following activities are to be performed to make sure that Kafka is ready to process the messages produced by the producers and that the Spark Streaming data processing application can consume those messages:

  1. Start the Zookeeper that comes with Kafka installation.
  2. Start the Kafka server.
  3. Create a topic for the producers to send the messages to.
  4. Pick up one Kafka producer and start publishing log event messages to the newly created topic.
  5. Use the Spark Streaming data processing application to process the log eventspublished to the newly created topic.

Starting Zookeeper and Kafka

The following scripts are run from separate terminal windows in order to start Zookeeper and the Kafka broker, and to create the required Kafka topics:

$ cd $KAFKA_HOME

$ $KAFKA_HOME/bin/zookeeper-server-start.sh

$KAFKA_HOME/config/zookeeper.properties

[2016-07-24 09:01:30,196] INFO binding to port 0.0.0.0/0.0.0.0:2181

(org.apache.zookeeper.server.NIOServerCnxnFactory)

$ $KAFKA_HOME/bin/kafka-server-start.sh

$KAFKA_HOME/config/server.properties

[2016-07-24 09:05:06,381] INFO 0 successfully elected as leader

(kafka.server.ZookeeperLeaderElector)

[2016-07-24 09:05:06,455] INFO [Kafka Server 0], started

(kafka.server.KafkaServer)

$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --

replication-factor 1 --partitions 1 --topic sfb

Created topic "sfb".

$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --

topic sfb

The Kafka message producer can be any application capable of publishing messages to the Kafka topics. Here, the kafka-console-producer that comes with Kafka is used as the producer of choice. Once the producer starts running, whatever is typed into its console window will be treated as a message that is published to the chosen Kafka topic. The Kafka topic is given as a command line argument when starting the kafka-console-producer.

The submission of the Spark Streaming data processing application that consumes log event messages produced by the Kafka producer is slightly different from the application covered in the preceding section. Here, many Kafka jar files are required for the data processing. Since they are not part of the Spark infrastructure, they have to be submitted to the Spark cluster. The following jar files are required for the successful running of this application:

  • $KAFKA_HOME/libs/kafka-clients-0.8.2.2.jar
  • $KAFKA_HOME/libs/kafka_2.11-0.8.2.2.jar
  • $KAFKA_HOME/libs/metrics-core-2.2.0.jar
  • $KAFKA_HOME/libs/zkclient-0.3.jar
  • Code/Scala/lib/spark-streaming-kafka-0-8_2.11-2.0.0-preview.jar
  • Code/Python/lib/spark-streaming-kafka-0-8_2.11-2.0.0-preview.jar

In the preceding list of jar files, the maven repository co-ordinate for spark-streamingkafka-0-8_2.11-2.0.0-preview.jar is "org.apache.spark" %% "sparkstreaming-kafka-0-8" % "2.0.0-preview". This particular jar file has to be downloaded and placed in the lib folder of the directory structure given in Figure 4. It is being used in the submit.sh and the submitPy.sh scripts, which submit the application to the Spark cluster. The download URL for this jar file is given in the reference section of this chapter.

In the submit.sh and submitPy.sh files, the last few lines contain a conditional statement looking for the second parameter value of 1 to identify this application and ship the required jar files to the Spark cluster.

Implementing the application in Scala

The following code snippet is the Scala code for the log event processing application that processes the messages produced by the Kafka producer. The use case of this application is the same as the one discussed in the preceding section concerning windowing operations:

/**

The following program can be compiled and run using SBT Wrapper scripts have been provided with this

The following script can be run to compile the code ./compile.sh

The following script can be used to run this application in Spark. The  second command line argument of value 1 is very important. This is to flag the shipping of the kafka jar files to the Spark cluster ./submit.sh com.packtpub.sfb.KafkaStreamingApps 1

**/

package com.packtpub.sfb



import java.util.HashMap

import org.apache.spark.streaming._

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.streaming.kafka._

import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer,

ProducerRecord}



object KafkaStreamingApps {

def main(args: Array[String]) {

// Log level settings

LogSettings.setLogLevels()

// Variables used for creating the Kafka stream

//The quorum of Zookeeper hosts

val zooKeeperQuorum = "localhost"

// Message group name

val messageGroup = "sfb-consumer-group"

//Kafka topics list separated by coma if there are multiple topics to be

listened on

val topics = "sfb"

//Number of threads per topic

val numThreads = 1

// Create the Spark Session and the spark context

val spark = SparkSession

.builder

.appName(getClass.getSimpleName)

.getOrCreate()

// Get the Spark context from the Spark session for creating the

streaming context

val sc = spark.sparkContext

// Create the streaming context

val ssc = new StreamingContext(sc, Seconds(10))

// Set the check point directory for saving the data to recover when

there is a crash

ssc.checkpoint("/tmp")

// Create the map of topic names

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

// Create the Kafka stream

val appLogLines = KafkaUtils.createStream(ssc, zooKeeperQuorum,

messageGroup, topicMap).map(_._2)

// Count each log messge line containing the word ERROR

val errorLines = appLogLines.filter(line => line.contains("ERROR"))

// Print the line containing the error

errorLines.print()

// Count the number of messages by the windows and print them

errorLines.countByWindow(Seconds(30), Seconds(10)).print()

// Start the streaming

ssc.start()

// Wait till the application is terminated

ssc.awaitTermination()

}

}

Compared to the Scala code in the preceding section, the major difference is in the way the stream is created.

Implementing the application in Python

The following code snippet is the Python code for the log event processing application that processes the message produced by the Kafka producer. The use case of this application is also the same as the one discussed in the preceding section concerning windowing operations:

# The following script can be used to run this application in Spark

# ./submitPy.sh KafkaStreamingApps.py 1



from __future__ import print_function

import sys

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils



if __name__ == "__main__":

# Create the Spark context

sc = SparkContext(appName="PythonStreamingApp")

# Necessary log4j logging level settings are done

log4j = sc._jvm.org.apache.log4j

log4j.LogManager.getRootLogger().setLevel(log4j.Level.WARN)

# Create the Spark Streaming Context with 10 seconds batch interval

ssc = StreamingContext(sc, 10)

# Set the check point directory for saving the data to recover when

there is a crash

ssc.checkpoint("tmp")

# The quorum of Zookeeper hosts

zooKeeperQuorum="localhost"

# Message group name

messageGroup="sfb-consumer-group"

# Kafka topics list separated by coma if there are multiple topics to

be listened on

topics = "sfb"

# Number of threads per topic

numThreads = 1

# Create a Kafka DStream

kafkaStream = KafkaUtils.createStream(ssc, zooKeeperQuorum,

messageGroup, {topics: numThreads})

# Create the Kafka stream

appLogLines = kafkaStream.map(lambda x: x[1])

# Count each log messge line containing the word ERROR

errorLines = appLogLines.filter(lambda appLogLine: "ERROR" in

appLogLine)

# Print the first ten elements of each RDD generated in this DStream to

the console

errorLines.pprint()

errorLines.countByWindow(30,10).pprint()

# Start the streaming

ssc.start()

# Wait till the application is terminated

ssc.awaitTermination()

The following commands are run on the terminal window to run the Scala application:

$ cd Scala

$ ./submit.sh com.packtpub.sfb.KafkaStreamingApps 1

The following commands are run on the terminal window to run the Python application:

$ cd Python

$

./submitPy.sh KafkaStreamingApps.py 1

When both of the preceding programs are running, whatever log event messages are typed into the console window of the Kafka console producer, and invoked using the following command and inputs, will be processed by the application. The outputs of this program will be very similar to the ones that are given in the preceding section:

$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --

topic sfb

[Fri Dec 20 01:46:23 2015] [ERROR] [client 1.2.3.4.5.6] Directory index

forbidden by rule: /home/raj/

[Fri Dec 20 01:46:23 2015] [WARN] [client 1.2.3.4.5.6] Directory index

forbidden by rule: /home/raj/

[Fri Dec 20 01:54:34 2015] [ERROR] [client 1.2.3.4.5.6] Directory index

forbidden by rule: /apache/web/test

Spark provides two approaches to process Kafka streams. The first one is the receiver-based approach that was discussed previously and the second one is the direct approach.

This direct approach to processing Kafka messages is a simplified method in which Spark Streaming is using all the possible capabilities of Kafka just like any of the Kafka topic consumers, and polls for the messages in the specific topic, and the partition by the offset number of the messages. Depending on the batch interval of the Spark Streaming data processing application, it picks up a certain number of offsets from the Kafka cluster, and this range of offsets is processed as a batch. This is highly efficient and ideal for processing messages with a requirement to have exactly-once processing. This method also reduces the Spark Streaming library's need to do additional work to implement the exactly-once semantics of the message processing and delegates that responsibility to Kafka. The programming constructs of this approach are slightly different in the APIs used for the data processing. Consult the appropriate reference material for the details.

The preceding sections introduced the concept of a Spark Streaming library and discussed some of the real-world use cases. There is a big difference between Spark data processing applications developed to process static batch data and those developed to process dynamic stream data in a deployment perspective. The availability of data processing applications to process a stream of data must be constant. In other words, such applications should not have components that are single points of failure. The following section is going to discuss this topic.

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at $19.99/month. Cancel anytime

Spark Streaming jobs in production

When a Spark Streaming application is processing the incoming data, it is very important to have uninterrupted data processing capability so that all the data that is getting ingested is processed. In business-critical streaming applications, most of the time missing even one piece of data can have a huge business impact. To deal with such situations, it is important to avoid single points of failure in the application infrastructure. From a Spark Streaming application perspective, it is good to understand how the underlying components in the ecosystem are laid out so that the appropriate measures can be taken to avoid single points of failure.

A Spark Streaming application deployed in a cluster such as Hadoop YARN, Mesos or Spark Standalone mode has two main components very similar to any other type of Spark application:

  • Spark driver: This contains the application code written by the user
  • Executors: The executors that execute the jobs submitted by the Spark driver

But the executors have an additional component called a receiver that receives the data getting ingested as a stream and saves it as blocks of data in memory. When one receiver is receiving the data and forming the data blocks, they are replicated to another executor for fault-tolerance. In other words, in-memory replication of the data blocks is done onto a different executor. At the end of every batch interval, these data blocks are combined to form a DStream and sent out for further processing downstream.

Figure 1 depicts the components working together in a Spark Streaming application infrastructure deployed in a cluster:

implementing-fault-tolerance-in-spark-streaming-data-processing-applications-with-apache-kafka-img-0

In Figure 1, there are two executors. The receiver component is deliberately not displayed in the second executor to show that it is not using the receiver and instead just collects the replicated data blocks from the other executor. But when needed, such as on the failure of the first executor, the receiver in the second executor can start functioning.

Implementing fault-tolerance in Spark Streaming data processing applications

Spark Streaming data processing application infrastructure has many moving parts. Failures can happen to any one of them, resulting in the interruption of the data processing. Typically failures can happen to the Spark driver or the executors.

When an executor fails, since the replication of data is happening on a regular basis, the task of receiving the data stream will be taken over by the executor on which the data was getting replicated. There is a situation in which when an executor fails, all the data that is unprocessed will be lost. To circumvent this problem, there is a way to persist the data blocks into HDFS or Amazon S3 in the form of write-ahead logs.

When the Spark driver fails, the driven program is stopped, all the executors lose connection, and they stop functioning. This is the most dangerous situation. To deal with this situation, some configuration and code changes are necessary.

The Spark driver has to be configured to have an automatic driver restart, which is supported by the cluster managers. This includes a change in the Spark job submission method to have the cluster mode in whichever may be the cluster manager. When a restart of the driver happens, to start from the place when it crashed, a checkpointing mechanism has to be implemented in the driver program. This has already been done in the code samples that are used. The following lines of code do that job:

ssc = StreamingContext(sc, 10)

ssc.checkpoint("tmp")

From an application coding perspective, the way the StreamingContext is created is slightly different. Instead of creating a new StreamingContext every time, the factory method getOrCreate of the StreamingContext
is to be used with a function, as shown in the following code segment. If that is done, when the driver is restarted, the factory method will check the checkpoint directory to see whether an earlier StreamingContext was in use, and, if found in the checkpoint data, it is created. Otherwise, a new StreamingContext is created.

The following code snippet gives the definition of a function that can be used with the getOrCreate factory method of the StreamingContext. As mentioned earlier, a detailed treatment of these aspects is beyond the scope of this book:

/**

* The following function has to be used when the code is being

restructured to have checkpointing and driver recovery

* The way it should be used is to use the StreamingContext.getOrCreate

with this function and do a start of that

*/

def sscCreateFn(): StreamingContext = {

// Variables used for creating the Kafka stream

// The quorum of Zookeeper hosts

val zooKeeperQuorum = "localhost"

// Message group name

val messageGroup = "sfb-consumer-group"

//Kafka topics list separated by coma if there are multiple topics to be

listened on

val topics = "sfb"

//Number of threads per topic

val numThreads = 1

// Create the Spark Session and the spark context

val spark = SparkSession

.builder

.appName(getClass.getSimpleName)

.getOrCreate()

// Get the Spark context from the Spark session for creating the

streaming context

val sc = spark.sparkContext

// Create the streaming context

val ssc = new StreamingContext(sc, Seconds(10))

// Create the map of topic names

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

// Create the Kafka stream

val appLogLines = KafkaUtils.createStream(ssc, zooKeeperQuorum,

messageGroup, topicMap).map(_._2)

// Count each log messge line containing the word ERROR

val errorLines = appLogLines.filter(line => line.contains("ERROR"))

// Print the line containing the error

errorLines.print()

// Count the number of messages by the windows and print them

errorLines.countByWindow(Seconds(30), Seconds(10)).print()

// Set the check point directory for saving the data to recover when

there is a crash

ssc.checkpoint("/tmp")

// Return the streaming context

ssc

}

At a data source level, it is a good idea to build parallelism for faster data processing and, depending on the source of data, this can be accomplished in different ways. Kafka inherently supports partition at the topic level, and that kind of scaling out mechanism supports a good amount of parallelism. As a consumer of Kafka topics, the Spark Streaming data processing application can have multiple receivers by creating multiple streams, and the data generated by those streams can be combined by the union operation on the Kafka streams.

The production deployment of Spark Streaming data processing applications is to be done purely based on the type of application that is being used. Some of the guidelines given previously are just introductory and conceptual in nature. There is no silver bullet approach to solving production deployment problems, and they have to evolve along with the application development.

To summarize, we looked at the production deployment of Spark Streaming data processing applications and the possible ways of implementing fault-tolerance in Spark Streaming and data processing applications using Kafka.

To explore more critical and equally important Spark tools such as Spark GraphX, Spark MLlib, DataFrames etc, do check out Apache Spark 2 for Beginners  to develop efficient large-scale applications with Apache Spark.

implementing-fault-tolerance-in-spark-streaming-data-processing-applications-with-apache-kafka-img-1