Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Apache Spark for Data Science Cookbook

You're reading from   Apache Spark for Data Science Cookbook Solve real-world analytical problems

Arrow left icon
Product type Paperback
Published in Dec 2016
Publisher
ISBN-13 9781785880100
Length 392 pages
Edition 1st Edition
Arrow right icon
Authors (2):
Arrow left icon
Padma Priya Chitturi Padma Priya Chitturi
Author Profile Icon Padma Priya Chitturi
Padma Priya Chitturi
Nagamallikarjuna Inelu Nagamallikarjuna Inelu
Author Profile Icon Nagamallikarjuna Inelu
Nagamallikarjuna Inelu
Arrow right icon
View More author details
Toc

Table of Contents (11) Chapters Close

Preface 1. Big Data Analytics with Spark 2. Tricky Statistics with Spark FREE CHAPTER 3. Data Analysis with Spark 4. Clustering, Classification, and Regression 5. Working with Spark MLlib 6. NLP with Spark 7. Working with Sparkling Water - H2O 8. Data Visualization with Spark 9. Deep Learning on Spark 10. Working with SparkR

Creating broadcast variables and accumulators

This recipe shows how to use accumulators and broadcast variables. Accumulators are used to aggregate values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. The other type of shared variable is the broadcast variable, which allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. Such variables are used in cases where the application needs to send a large, read-only lookup table to all the nodes.

Getting ready

To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.

How to do it…

  1. The log file of workers from Spark log $SPARK_HOME/logs is taken, whose filename looks like this: spark-padma-org.apache.spark.deploy.worker.Worker-1-blrrndtipdl19. Place this file in HDFS. This log file contains Spark log information with different trace levels, such as DEBUG, INFO, WARN, and ERROR. The sample data looks as follows:
    How to do it…
  2. Let's work with an accumulator now:
            val sc = new SparkContext 
            val logFile =   
            sc.textFile("hdfs://namenodeHostName:8020/data/spark-
            worker-Worker1.out") 
            val errorLines = sc.accumulator(0) 
            val debugLines = logFile.map{line =>  
            if(line.contains("ERROR")) 
            errorLines +=1 
            if(line.contains("DEBUG"))line 
            } 
           debugLines.saveAsTextFile("hdfs://namenodeHostName:8020/data
           /out/
           debugLines.txt") 
           println("ERROR Lines: "+ errorLines.value) 
    
  3. Now create a broadcast variable and use it in the workers as follows:
          val sc = new SparkContext 
          val broadCastedTemperatures = sc.broadcast(Map("KOCHI" ->  
          22,"BNGLR" -> 22, "HYD" -> 24, "MUMBAI" -> 21, "DELHI" -> 17,   
          "NOIDA" -> 19, "SIMLA" -> 9)) 
          val inputRdd = sc.parallelize(Array("BNGLR",20), ("BNGLR",16),  
          ("KOCHI",-999), ("SIMLA",-999), ("DELHI",19, ("DELHI",-999),
          ("MUMBAI",27), ("MUMBAI",-999), ("HYD",19), ("HYD",25), 
          ("NOIDA",-999) )  
          val replacedRdd = inputRdd.map{case(location, temperature) => 
          val standardTemperatures = broadCastedTemperatures.value 
          if(temperature == -999 && standardTemperatures.get(location) !=  
          None) (location, standardTemperatures.get(location).get) else
          if(temperature != -999) (location, temperature ) 
          } 
          val locationsWithMaxTemperatures =   
          replacedRdd.reduceByKey{(temp1,
          temp2) => if (temp1 > temp2) temp1 else temp2} 
    

How it works…

Initially, when working with accumulators, we created Accumulator[Int], called errorLines, and added 1 to it whenever we saw a line that contained ERROR. We will see the correct count for errorLines only after the saveAsTextFile() action runs because the transformation map() is lazy, so the side-effect, incrementing the accumulator happens only when the map() is forced to occur by saveAsTextFile(). The return type of the accumulator would be the org.apache.spark.Accumulator[T] object where T is the type of the value.

Well, coming to broadcast variables, SparkContext.broadcast creates a broadcast variable of type Broadcast[T]. T is of any type and it should be serializable. The value of the broadcast variable is accessed using the value property. The variable is sent to each node only once and is read-only.

There's more…

Spark has support for custom accumulator types. They need to extend AccumulatorParam.

Also, when working with broadcast variables, it is essential to choose a serialization format which is fast and compact.

Tip

For more information on broadcast variables, please refer: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables.

See also

Please visit the earlier Working with the Spark programming model , Working with Spark's Python and Scala shells , and Working with pair RDDs recipes to get familiar with Spark.

lock icon The rest of the chapter is locked
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image