Creating broadcast variables and accumulators
This recipe shows how to use accumulators and broadcast variables. Accumulators are used to aggregate values from worker nodes back to the driver program. One of the most common uses of accumulators is to count events that occur during job execution for debugging purposes. The other type of shared variable is the broadcast variable, which allows the program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations. Such variables are used in cases where the application needs to send a large, read-only lookup table to all the nodes.
Getting ready
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the distributed modes, that is, standalone, YARN, or Mesos.
How to do it…
- The log file of workers from Spark log
$SPARK_HOME/logs
is taken, whose filename looks like this:spark-padma-org.apache.spark.deploy.worker.Worker-1-blrrndtipdl19
. Place this file in HDFS. This log file contains Spark log information with different trace levels, such asDEBUG
,INFO
,WARN
, andERROR
. The sample data looks as follows: - Let's work with an accumulator now:
val sc = new SparkContext val logFile = sc.textFile("hdfs://namenodeHostName:8020/data/spark- worker-Worker1.out") val errorLines = sc.accumulator(0) val debugLines = logFile.map{line => if(line.contains("ERROR")) errorLines +=1 if(line.contains("DEBUG"))line } debugLines.saveAsTextFile("hdfs://namenodeHostName:8020/data /out/ debugLines.txt") println("ERROR Lines: "+ errorLines.value)
- Now create a broadcast variable and use it in the workers as follows:
val sc = new SparkContext val broadCastedTemperatures = sc.broadcast(Map("KOCHI" -> 22,"BNGLR" -> 22, "HYD" -> 24, "MUMBAI" -> 21, "DELHI" -> 17, "NOIDA" -> 19, "SIMLA" -> 9)) val inputRdd = sc.parallelize(Array("BNGLR",20), ("BNGLR",16), ("KOCHI",-999), ("SIMLA",-999), ("DELHI",19, ("DELHI",-999), ("MUMBAI",27), ("MUMBAI",-999), ("HYD",19), ("HYD",25), ("NOIDA",-999) ) val replacedRdd = inputRdd.map{case(location, temperature) => val standardTemperatures = broadCastedTemperatures.value if(temperature == -999 && standardTemperatures.get(location) != None) (location, standardTemperatures.get(location).get) else if(temperature != -999) (location, temperature ) } val locationsWithMaxTemperatures = replacedRdd.reduceByKey{(temp1, temp2) => if (temp1 > temp2) temp1 else temp2}
How it works…
Initially, when working with accumulators, we created Accumulator[Int]
, called errorLines
, and added 1
to it whenever we saw a line that contained ERROR
. We will see the correct count for errorLines
only after the saveAsTextFile()
action runs because the transformation map()
is lazy, so the side-effect, incrementing the accumulator happens only when the map()
is forced to occur by saveAsTextFile()
. The return type of the accumulator would be the org.apache.spark.Accumulator[T]
object where T
is the type of the value.
Well, coming to broadcast variables, SparkContext.broadcast
creates a broadcast variable of type Broadcast[T]
. T
is of any type and it should be serializable. The value of the broadcast variable is accessed using the value
property. The variable is sent to each node only once and is read-only.
There's more…
Spark has support for custom accumulator types. They need to extend AccumulatorParam
.
Tip
For additional information on this, please visit: http://spark.apache.org/docs/latest/programming-guide.html#accumulators-a-nameaccumlinka.
Also, when working with broadcast variables, it is essential to choose a serialization format which is fast and compact.
Tip
For more information on broadcast variables, please refer: http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables.
See also
Please visit the earlier Working with the Spark programming model , Working with Spark's Python and Scala shells , and Working with pair RDDs recipes to get familiar with Spark.