Working with Spark Streaming
Spark Streaming is a library in the Spark ecosystem which addresses real-time processing needs. Spark's batch processing executes the job over large datasets at once, where as Streaming aims for low latency (in hundreds of milliseconds), as data becomes available, it is transformed and processing is done in near real time.
Spark Streaming functions by running jobs over the small batches of data that accumulate in small intervals. It is used for rapid alerting, for supplying dashboards with up-to-date information, as well as for scenarios that need more complex analytics. For example, a common use case in anomaly detection to run K-means clustering on small batches of data and trigger an alert if the cluster center deviates from what is normal.
Tip
For more information, please visit: http://spark.apache.org/docs/latest/streaming-programming-guide.html.
This recipe shows how to work with Spark Streaming and apply stateless transformations and Windowed transformations.
Getting ready
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
How to do it…
- There are two types of transformations supported in Spark Streaming: stateless and stateful (windowed). Let's apply the stateless transformations:
import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream._ import org.apache.spark.SparkConf object StatelessTransformations { def main(args:Array[String]) { val conf= new SparkConf conf.setMaster("spark://master:7077").setAppName("StreamingApp") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(5)) val spamInfoRDD = ssc.sparkContext.textFile("/path/fileName", 2) val lines = ssc.socketTextStream("172.22.225.174", 7777) val mapLines = lines.map(ele => ele+"<<<>>>") val mapLines2 = lines.map(ele => (ele,1)) val errorLines = lines.filter(line =>line.contains("Padma")) val flatMapLines = lines.flatMap(ele => ele.split(",")) val reduceByKeyLines = mapLines2.reduceByKey((a,b) => a+b) val groupByKeyLines = mapLines2.groupByKey().mapValues(names => names.toSet.size) val unionedDstreams = mapLines.union(flatMapLines) val joinedDstream = reduceByKeyLines.join(groupByKeyLines) val cogroupedDStream = reduceByKeyLines.cogroup(groupByKeyLines) val transformLines = lines.transform(rdd => {rdd.union(spamInfoRDD).filter(_.contains("Padma"))}) errorLines.print mapLines.print flatMapLines.print reduceByKeyLines.print groupByKeyLines.print joinedDstream.print cogroupedDStream.print unionedDstreams.print ssc.start ssc.awaitTermination ssc.stop } }
- Now let's apply windowed/stateful transformations:
object StatefulTransformations { def updateRunningSum(values:Seq[Long], state:Option[Long]) Some(state.getOrElse(0L) + values.size) def main(args:Array[String]) { val conf = new SparkConf conf.setMaster("spark://master:7077").setAppName(" Stateful_transformations") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("172.25.41.66", 7777) val windowedLines = lines.window(Seconds(4),Seconds(2)) val mapLines = lines.map(ele => (ele,1L)) val windowCounts = windowedLines.count val countByWindowLines = lines.countByWindow(Seconds(4), Seconds(2)) val reducedLines = lines.reduce(_+_) val updateDStream = mapLines.updateStateByKey (updateRunningSum _) val mapLines = lines.map(ele => (ele,1)) val reducedKeyWindow = mapLines.reduceByKeyAndWindow({(x,y)=> x+y}, {(x,y) => x-y}, Seconds(4), Seconds(2)) windowedLines.print windowCounts.print reducedKeyWindow.print countByWindowLines.print updateDStream.print reducedLines.print ssc.checkpoint("/home/padma/StreamingCheckPoint/") ssc.start ssc.awaitTermination } }
- It's also possible to apply DataFrames, that is, SQL operations on streaming data. The following code snippet shows SQL operations over streams of data:
import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.kafka._ import org.apache.spark.sql._ import org.apache.spark.sql.SQLContext ... object StreamingSQL { case class Words(wordName:String, count:Int) def main(args:Array[String]) { val conf = new SparkConf conf.setAppName("StreamingSQL").setMaster ("spark://master:7077") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(4)) val kafkaParams = Map("test-consumer-group" -> 1) val topicMap = Map("test" -> 1) val kafkaLines = KafkaUtils.createStream(ssc,"blrovh:2181", "test-consumer-group",topicMap) val words = kafkaLines.map{tuple => tuple._2} val wordPairs = words.map(word => (word,1)) val reduceWords = wordPairs.reduceByKey((a,b) => a+b) reduceWords.foreachRDD{ rdd => { val sqlContext = new SQLContext(rdd.sparkContext) import sqlContext.implicits._ val df = rdd.map(record => Words(record._1, record._2)) val dfNew = sqlContext.createDataFrame(df) dfNew.registerTempTable("Words") val data = sqlContext.sql("select wordName from Words") data.foreach(row => println(row.toString)) } } ssc.start ssc.awaitTermination } }
How it works…
The new StreamingContext(SparkContext, Seconds(1))
line instantiates the StreamingContext
, which takes SparkContext
and batch interval as parameters. StreamingContext.socketTextStream(<ip>,<port-number>)
initializes a socket stream, which listens on the specified port for messages. This creates a DStream (discretized stream) which is a sequence of RDDs, being generated for each batch.
When working with windowed/stateful transformations, lines.window(Seconds(4),Seconds(2))
creates a window of 4 seconds and a sliding duration of 2 seconds on the incoming DStream lines. The window-based transformations such as reduceByKeyAndWindow
and countByWindow
use data or intermediate results from previous batches to compute the results of the current batch. The updateStateByKey
transformation constructs DStream (key, state) pairs by the specified function, where this function indicates how to update the state for each key given new values.
In the case of applying SQL operations on streaming data, KafkaUtils.createStream
initializes a DStream from Kafka. It takes StreamingContext
, zookeeperhostname
(blrovh
), port-number
of zookeeper (2181
), consumer-group name
(test-consumer-group) and topic map
(Map("test" -> 1)
) as parameters. The new SQLContext
line creates SQLContext and SQLContext.createDataFrame
creates a data frame for each RDD of the DStream. Using the DataFrame, the SQL queries are executed.
There's more…
Spark Streaming uses a micro-batch architecture, where the streaming computation is a continuous series of batch computations on small batches of data. For each input source, Spark Streaming launches receivers, which are long-running tasks within an application executor, collects the data, replicates it to another executor for fault tolerance, and saves it as RDDs. Using the Kafka Direct Streaming approach, the Dstream is created as KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kakfaParams, topicSet)
. To address the fault tolerance, check-pointing is done, which periodically saves the state to a reliable filesystem.
Also, when running SQL queries on streaming data, there is a method for retaining the data for a specific duration before the query can complete. As in Spark batch processing, streams of data can be persisted either in memory or in disk. In the case of stateful operations, the data is, by default, persistent in memory.
See also
For more details on Spark Streaming, internal architecture, check-pointing, performance tuning, and receiver parallelism, please refer to the following: