Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Apache Spark for Data Science Cookbook

You're reading from   Apache Spark for Data Science Cookbook Solve real-world analytical problems

Arrow left icon
Product type Paperback
Published in Dec 2016
Publisher
ISBN-13 9781785880100
Length 392 pages
Edition 1st Edition
Arrow right icon
Authors (2):
Arrow left icon
Padma Priya Chitturi Padma Priya Chitturi
Author Profile Icon Padma Priya Chitturi
Padma Priya Chitturi
Nagamallikarjuna Inelu Nagamallikarjuna Inelu
Author Profile Icon Nagamallikarjuna Inelu
Nagamallikarjuna Inelu
Arrow right icon
View More author details
Toc

Table of Contents (11) Chapters Close

Preface 1. Big Data Analytics with Spark 2. Tricky Statistics with Spark FREE CHAPTER 3. Data Analysis with Spark 4. Clustering, Classification, and Regression 5. Working with Spark MLlib 6. NLP with Spark 7. Working with Sparkling Water - H2O 8. Data Visualization with Spark 9. Deep Learning on Spark 10. Working with SparkR

Working with Spark Streaming

Spark Streaming is a library in the Spark ecosystem which addresses real-time processing needs. Spark's batch processing executes the job over large datasets at once, where as Streaming aims for low latency (in hundreds of milliseconds), as data becomes available, it is transformed and processing is done in near real time.

Spark Streaming functions by running jobs over the small batches of data that accumulate in small intervals. It is used for rapid alerting, for supplying dashboards with up-to-date information, as well as for scenarios that need more complex analytics. For example, a common use case in anomaly detection to run K-means clustering on small batches of data and trigger an alert if the cluster center deviates from what is normal.

This recipe shows how to work with Spark Streaming and apply stateless transformations and Windowed transformations.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. There are two types of transformations supported in Spark Streaming: stateless and stateful (windowed). Let's apply the stateless transformations:
           import org.apache.spark.SparkConf 
           import org.apache.spark.SparkContext._ 
           import org.apache.spark.streaming._ 
           import org.apache.spark.streaming.StreamingContext._ 
           import org.apache.spark.streaming.dstream._ 
           import org.apache.spark.SparkConf 
     
           object StatelessTransformations {   
           def main(args:Array[String]) { 
           val conf= new SparkConf 
           conf.setMaster("spark://master:7077").setAppName("StreamingApp") 
           val sc = new SparkContext(conf) 
           val ssc = new StreamingContext(sc, Seconds(5)) 
           val spamInfoRDD = ssc.sparkContext.textFile("/path/fileName", 2) 
           val lines = ssc.socketTextStream("172.22.225.174", 7777) 
           val mapLines = lines.map(ele => ele+"<<<>>>") 
           val mapLines2 = lines.map(ele => (ele,1)) 
           val errorLines = lines.filter(line =>line.contains("Padma")) 
           val flatMapLines = lines.flatMap(ele => ele.split(",")) 
           val reduceByKeyLines = mapLines2.reduceByKey((a,b) => a+b) 
           val groupByKeyLines = mapLines2.groupByKey().mapValues(names =>        
           names.toSet.size) 
           val unionedDstreams = mapLines.union(flatMapLines) 
           val joinedDstream = reduceByKeyLines.join(groupByKeyLines) 
           val cogroupedDStream = reduceByKeyLines.cogroup(groupByKeyLines) 
           val transformLines = lines.transform(rdd =>  
           {rdd.union(spamInfoRDD).filter(_.contains("Padma"))}) 
           errorLines.print 
           mapLines.print 
           flatMapLines.print 
           reduceByKeyLines.print 
           groupByKeyLines.print 
           joinedDstream.print 
           cogroupedDStream.print 
           unionedDstreams.print 
           ssc.start 
           ssc.awaitTermination 
           ssc.stop     
            } 
           } 
    
  2. Now let's apply windowed/stateful transformations:
           object StatefulTransformations { 
           def updateRunningSum(values:Seq[Long], state:Option[Long]) 
           Some(state.getOrElse(0L) + values.size) 
        
           def main(args:Array[String]) 
           {   
           val conf = new SparkConf 
           conf.setMaster("spark://master:7077").setAppName("
           Stateful_transformations") 
           val sc = new SparkContext(conf) 
           val ssc = new StreamingContext(sc, Seconds(1)) 
           val lines = ssc.socketTextStream("172.25.41.66", 7777) 
           val windowedLines = lines.window(Seconds(4),Seconds(2)) 
           val mapLines = lines.map(ele => (ele,1L))    
           val windowCounts = windowedLines.count    
           val countByWindowLines = lines.countByWindow(Seconds(4),
           Seconds(2)) 
           val reducedLines = lines.reduce(_+_) 
           val updateDStream = mapLines.updateStateByKey
           (updateRunningSum _)  
           val mapLines = lines.map(ele => (ele,1)) 
           val reducedKeyWindow  = mapLines.reduceByKeyAndWindow({(x,y)=> 
           x+y}, {(x,y) => x-y}, Seconds(4), Seconds(2)) 
         
            windowedLines.print 
            windowCounts.print 
            reducedKeyWindow.print 
            countByWindowLines.print 
            updateDStream.print 
            reducedLines.print 
            ssc.checkpoint("/home/padma/StreamingCheckPoint/") 
            ssc.start 
            ssc.awaitTermination 
              } 
            } 
    
  3. It's also possible to apply DataFrames, that is, SQL operations on streaming data. The following code snippet shows SQL operations over streams of data:
            import org.apache.spark.streaming.StreamingContext._ 
            import org.apache.spark.streaming.kafka._ 
            import org.apache.spark.sql._ 
            import org.apache.spark.sql.SQLContext 
            ... 
            object StreamingSQL {  
              case class Words(wordName:String, count:Int) 
       
             def main(args:Array[String]) 
             { 
             val conf = new SparkConf 
             conf.setAppName("StreamingSQL").setMaster
             ("spark://master:7077") 
                val sc = new SparkContext(conf) 
                val ssc = new StreamingContext(sc,Seconds(4)) 
                val kafkaParams = Map("test-consumer-group" -> 1) 
                val topicMap = Map("test" -> 1) 
                val kafkaLines =                     
                KafkaUtils.createStream(ssc,"blrovh:2181",
                "test-consumer-group",topicMap) 
                val words = kafkaLines.map{tuple => tuple._2} 
                val wordPairs = words.map(word => (word,1)) 
                val reduceWords = wordPairs.reduceByKey((a,b) => a+b) 
                reduceWords.foreachRDD{ 
                rdd => 
                { 
                  val sqlContext = new SQLContext(rdd.sparkContext) 
                  import sqlContext.implicits._ 
                  val df = rdd.map(record => Words(record._1, record._2)) 
                  val dfNew = sqlContext.createDataFrame(df) 
                  dfNew.registerTempTable("Words") 
                  val data = sqlContext.sql("select wordName from Words") 
                  data.foreach(row => println(row.toString)) 
                  } 
                } 
            ssc.start 
            ssc.awaitTermination  } } 
    

How it works…

The new StreamingContext(SparkContext, Seconds(1)) line instantiates the StreamingContext, which takes SparkContext and batch interval as parameters. StreamingContext.socketTextStream(<ip>,<port-number>) initializes a socket stream, which listens on the specified port for messages. This creates a DStream (discretized stream) which is a sequence of RDDs, being generated for each batch.

When working with windowed/stateful transformations, lines.window(Seconds(4),Seconds(2)) creates a window of 4 seconds and a sliding duration of 2 seconds on the incoming DStream lines. The window-based transformations such as reduceByKeyAndWindow and countByWindow use data or intermediate results from previous batches to compute the results of the current batch. The updateStateByKey transformation constructs DStream (key, state) pairs by the specified function, where this function indicates how to update the state for each key given new values.

In the case of applying SQL operations on streaming data, KafkaUtils.createStream initializes a DStream from Kafka. It takes StreamingContext, zookeeperhostname (blrovh), port-number of zookeeper (2181), consumer-group name (test-consumer-group) and topic map (Map("test" -> 1)) as parameters. The new SQLContext line creates SQLContext and SQLContext.createDataFrame creates a data frame for each RDD of the DStream. Using the DataFrame, the SQL queries are executed.

There's more…

Spark Streaming uses a micro-batch architecture, where the streaming computation is a continuous series of batch computations on small batches of data. For each input source, Spark Streaming launches receivers, which are long-running tasks within an application executor, collects the data, replicates it to another executor for fault tolerance, and saves it as RDDs. Using the Kafka Direct Streaming approach, the Dstream is created as KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kakfaParams, topicSet). To address the fault tolerance, check-pointing is done, which periodically saves the state to a reliable filesystem.

Also, when running SQL queries on streaming data, there is a method for retaining the data for a specific duration before the query can complete. As in Spark batch processing, streams of data can be persisted either in memory or in disk. In the case of stateful operations, the data is, by default, persistent in memory.

See also

For more details on Spark Streaming, internal architecture, check-pointing, performance tuning, and receiver parallelism, please refer to the following:

lock icon The rest of the chapter is locked
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image