Search icon CANCEL
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon

Spark Streaming

Save for later
  • 11 min read
  • 06 Jul 2017

article-image

In this article by Romeo Kienzler, the author of the book Mastering Apache Spark 2.x - Second Edition, we will see Apache Streaming module is a stream processing-based module within Apache Spark. It uses the Spark cluster to offer the ability to scale to a high degree. Being based on Spark, it is also highly fault tolerant, having the ability to rerun failed tasks by check-pointing the data stream that is being processed. The following areas will be covered in this article after an initial section, which will provide a practical overview of how Apache Spark processes stream-based data:

  • Error recovery and check-pointing
  • TCP-based stream processing
  • File streams
  • Kafka stream source

For each topic, we will provide a worked example in Scala, and will show how the stream-based architecture can be set up and tested.

(For more resources related to this topic, see here.)

Overview

The following diagram shows potential data sources for Apache Streaming, such as Kafka, Flume, and HDFS:
spark-streaming-img-0

These feed into the Spark Streaming module, and are processed as Discrete Streams. The diagram also shows that other Spark module functionality, such as machine learning, can be used to process the stream-based data. The fully processed data can then be an output for HDFS, databases, or dashboards. This diagram is based on the one at the Spark streaming website, but we wanted to extend it for expressing the Spark module functionality:

spark-streaming-img-1

 When discussing Spark Discrete Streams, the previous figure, again taken from the Spark website at http://spark.apache.org/, is the diagram we like to use. The green boxes in the previous figure show the continuous data stream sent to Spark, being broken down into a Discrete Streams (DStream). The size of each element in the stream is then based on a batch time, which might be two seconds. It is also possible to create a window, expressed as the previous red box, over the DStream. For instance, when carrying out trend analysis in real time, it might be necessary to determine the top ten Twitter-based hashtags over a ten minute window.

So, given that Spark can be used for stream processing, how is a stream created? The following Scala-based code shows how a Twitter stream can be created. This example is simplified because Twitter authorization has not been included, but you get the idea.

The Spark Stream Context (SSC) is created using the Spark Context sc. A batch time is specified when it is created; in this case, 5 seconds. A Twitter-based DStream, called stream, is then created from the Streamingcontext using a window of 60 seconds:

val ssc    = new StreamingContext(sc, Seconds(5) )
val stream = TwitterUtils.createStream(ssc,None).window( Seconds(60) )

The stream processing can be started with the stream context start method (shown next), and the awaitTermination method indicates that it should process until stopped. So, if this code is embedded in a library-based application, it will run until the session is terminated, perhaps with a Crtl + C:

ssc.start()
ssc.awaitTermination()

This explains what Spark Streaming is, and what it does, but it does not explain error handling, or what to do if your stream-based application fails. The next section will examine Spark Streaming error management and recovery.

Errors and recovery

Generally, the question that needs to be asked for your application is; is it critical that you receive and process all the data? If not, then on failure you might just be able to restart the application and discard the missing or lost data. If this is not the case, then you will need to use check pointing, which will be described in the next section.

It is also worth noting that your application's error management should be robust and self-sufficient. What we mean by this is that; if an exception is non-critical, then manage the exception, perhaps log it, and continue processing. For instance, when a task reaches the maximum number of failures (specified by spark.task.maxFailures), it will terminate processing.

Checkpointing

It is possible to set up an HDFS-based checkpoint directory to store Apache Spark-based streaming information. In this Scala example, data will be stored in HDFS, under /data/spark/checkpoint. The following HDFS file system ls command shows that before starting, the directory does not exist:

[hadoop@hc2nn stream]$ hdfs dfs -ls /data/spark/checkpoint


 ls: `/data/spark/checkpoint': No such file or directory

The Twitter-based Scala code sample given next, starts by defining a package name for the application, and by importing Spark Streaming Context, and Twitter-based functionality. It then defines an application object named stream1:

 package nz.co.semtechsolutions
import org.apache.spark._
  import org.apache.spark.SparkContext._
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.twitter._
  import org.apache.spark.streaming.StreamingContext._
object stream1 {

Next, a method is defined called createContext, which will be used to create both the spark, and streaming contexts. It will also checkpoint the stream to the HDFS-based directory using the streaming context checkpoint method, which takes a directory path as a parameter. The directory path being the value (cpDir) that was passed into the createContext method:

 
 def createContext( cpDir : String ) : StreamingContext = {
    val appName = "Stream example 1"
      val conf    = new SparkConf()
    conf.setAppName(appName)
    val sc = new SparkContext(conf)
    val ssc    = new StreamingContext(sc, Seconds(5) )
    ssc.checkpoint( cpDir )
    ssc
    }

Now, the main method is defined, as is the HDFS directory, as well as Twitter access authority and parameters. The Spark Streaming context ssc is either retrieved or created using the HDFS checkpoint directory via the StreamingContext method—getOrCreate. If the directory doesn't exist, then the previous method called createContext is called, which will create the context and checkpoint. Obviously, we have truncated our own Twitter auth.keys in this example for security reasons:

 def main(args: Array[String]) {
    val hdfsDir = "/data/spark/checkpoint"
    val consumerKey       = "QQpxx"
      val consumerSecret    = "0HFzxx"
      val accessToken       = "323xx"
      val accessTokenSecret = "IlQxx"
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
      System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
      System.setProperty("twitter4j.oauth.accessToken", accessToken)
      System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
   val ssc = StreamingContext.getOrCreate(hdfsDir,
        () => { createContext( hdfsDir ) })
   val stream = TwitterUtils.createStream(ssc,None).window( Seconds(60) )
    // do some processing
    ssc.start()
      ssc.awaitTermination()
  } // end main

Having run this code, which has no actual processing, the HDFS checkpoint directory can be checked again. This time it is apparent that the checkpoint directory has been created, and the data has been stored:

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at AU $24.99/month. Cancel anytime
 [hadoop@hc2nn stream]$ hdfs dfs -ls /data/spark/checkpoint


 Found 1 items


 drwxr-xr-x   - hadoop supergroup          0 2015-07-02 13:41 /data/spark/checkpoint/0fc3d94e-6f53-40fb-910d-1eef044b12e9

This example, taken from the Apache Spark website, shows how checkpoint storage can be set up and used. But how often is checkpointing carried out? The metadata is stored during each stream batch. The actual data is stored with a period, which is the maximum of the batch interval, or ten seconds. This might not be ideal for you, so you can reset the value using the method:

 DStream.checkpoint( newRequiredInterval )

Where newRequiredInterval is the new checkpoint interval value that you require, generally you should aim for a value which is five to ten times your batch interval. Checkpointing saves both the stream batch and metadata (data about the data).

If the application fails, then when it restarts, the checkpointed data is used when processing is started. The batch data that was being processed at the time of failure is reprocessed, along with the batched data since the failure. Remember to monitor the HDFS disk space being used for check pointing.

In the next section, we will begin to examine the streaming sources, and will provide some examples of each type.

Streaming sources

We will not be able to cover all the stream types with practical examples in this section, but where this article is too small to include code, we will at least provide a description. In this article, we will cover the TCP and file streams, and the Flume, Kafka, and Twitter streams. We will start with a practical TCP-based example. This article examines stream processing architecture.

For instance, what happens in cases where the stream data delivery rate exceeds the potential data processing rate? Systems like Kafka provide the possibility of solving this issue by providing the ability to use multiple data topics and consumers.

TCP stream

There is a possibility of using the Spark Streaming Context method called socketTextStream to stream data via TCP/IP, by specifying a hostname and a port number. The Scala-based code example in this section will receive data on port 10777 that was supplied using the Netcat Linux command. The code sample starts by defining the package name, and importing Spark, the context, and the streaming classes. The object class named stream2 is defined, as it is the main method with arguments:

 package nz.co.semtechsolutions
import org.apache.spark._
  import org.apache.spark.SparkContext._
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.StreamingContext._
object stream2 {
  def main(args: Array[String]) {

The number of arguments passed to the class is checked to ensure that it is the hostname and the port number. A Spark configuration object is created with an application name defined. The Spark and streaming contexts are then created. Then, a streaming batch time of 10 seconds is set:

    if ( args.length < 2 )
      {
        System.err.println("Usage: stream2 <host> <port>")
        System.exit(1)
      }
    val hostname = args(0).trim
      val portnum  = args(1).toInt
    val appName = "Stream example 2"
      val conf    = new SparkConf()
    conf.setAppName(appName)
    val sc  = new SparkContext(conf)
      val ssc = new StreamingContext(sc, Seconds(10) )

A DStream called rawDstream is created by calling the socketTextStream method of the streaming context using the host and port name parameters.

val rawDstream = ssc.socketTextStream( hostname, portnum )

A top-ten word count is created from the raw stream data by splitting words by spacing. Then a (key,value) pair is created as (word,1), which is reduced by the key value, this being the word. So now, there is a list of words and their associated counts. Now, the key and value are swapped, so the list becomes (count and word). Then, a sort is done on the key, which is now the count. Finally, the top 10 items in the RDD, within the DStream, are taken and printed out:

val wordCount = rawDstream
                       .flatMap(line => line.split(" "))
                       .map(word => (word,1))
                       .reduceByKey(_+_)
                       .map(item => item.swap)
                       .transform(rdd => rdd.sortByKey(false))
                       .foreachRDD( rdd =>
                         { rdd.take(10).foreach(x=>println("List : " + x)) })

The code closes with the Spark Streaming start, and awaitTermination methods being called to start the stream processing and await process termination:

    ssc.start()
      ssc.awaitTermination()
  } // end main
} // end stream2

The data for this application is provided, as we stated previously, by the Linux Netcat (nc) command. The Linux Cat command dumps the contents of a log file, which is piped to nc. The lk options force Netcat to listen for connections, and keep on listening if the connection is lost. This example shows that the port being used is 10777:

 [root@hc2nn log]# pwd


 /var/log


 [root@hc2nn log]# cat ./anaconda.storage.log | nc -lk 10777

The output from this TCP-based stream processing is shown here. The actual output is not as important as the method demonstrated. However, the data shows, as expected, a list of 10 log file words in descending count order. Note that the top word is empty because the stream was not filtered for empty words:

 List : (17104,)


 List : (2333,=)


 List : (1656,:)


 List : (1603,;)


 List : (1557,DEBUG)


 List : (564,True)


 List : (495,False)


 List : (411,None)


 List : (356,at)


 List : (335,object)

This is interesting if you want to stream data using Apache Spark Streaming, based upon TCP/IP from a host and port. But what about more exotic methods? What if you wish to stream data from a messaging system, or via memory-based channels? What if you want to use some of the big data tools available today like Flume and Kafka? The next sections will examine these options, but first I will demonstrate how streams can be based upon files.

Summary

We could have provided streaming examples for systems like Kinesis, as well as queuing systems, but there was not room in this article. This article has provided practical examples of data recovery via checkpointing in Spark Streaming. It has also touched on the performance limitations of checkpointing and shown that that the checkpointing interval should be set at five to ten times the Spark stream batch interval.

Resources for Article:


Further resources on this subject: