Working with the Spark programming model
This recipe explains the fundamentals of the Spark programming model. It covers the RDD basics that is, Spark provides a Resilient Distributed Dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated in parallel. It also covers how to create and perform transformations and actions on RDDs.
How to do it…
- Let's create RDDs and apply a few transformations such as
map
andfilter
, and afew
actions such ascount
,take
,top
, and so on, in Spark-shell:scala> val data = Array(1, 2, 3, 4, 5) scala> val rddData = sc.parallelize(data) scala> val mydata = data.filter(ele => ele%2==0) mydata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:29 scala> val mydata = data.map(ele => ele+2) mydata: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:30 scala> mydata.count() res1: Long = 5 scala> mydata.take(2) res2:Array[Int] = Array(1,2) scala> mydata.top(1) res2:Array[Int] = Array(5)
- Now let's work with the transformations and actions in a Spark standalone application:
object SparkTransformations { def main(args:Array[String]){ val conf = new SparkConf conf.setMaster("spark://master:7077") val sc = new SparkContext(conf) val baseRdd1 = sc.parallelize(Array("hello","hi","priya","big","data","hub", "hub","hi"),1) val baseRdd2 = sc.parallelize(Array("hey","ram","krishna","priya"),1) val baseRdd3 = sc.parallelize(Array(1,2,3,4),2) val sampledRdd = baseRdd1.sample(false,0.5) val unionRdd = baseRdd1.union(baseRdd2).repartition(1) val intersectionRdd = baseRdd1.intersection(baseRdd2) val distinctRdd = baseRdd1.distinct.repartition(1) val subtractRdd = baseRdd1.subtract(baseRdd2) val cartesianRdd = sampledRdd.cartesian(baseRdd2) val reducedValue = baseRdd3.reduce((a,b) => a+b) val collectedRdd = distinctRdd.collect collectedRdd.foreach(println) val count = distinctRdd.count val first = distinctRdd.first println("Count is..."+count); println("First Element is..."+first) val takeValues = distinctRdd.take(3) val takeSample = distinctRdd.takeSample(false, 2) val takeOrdered = distinctRdd.takeOrdered(2) takeValues.foreach(println) println("Take Sample Values..") takeSample.foreach(println) val foldResult = distinctRdd.fold("<>")((a,b) => a+b) println(foldResult) }}
How it works…
Spark offers an abstraction called an RDD as part of its programming model. The preceding code snippets show RDD creation, transformations, and actions. Transformations such as union
, subtract
, intersection
, sample
, cartesian
, map
, filter
, and flatMap
when applied on a RDD result in a new RDD, whereas actions such as count
, first
, take(3)
, takeSample(false, 2)
and takeOrdered(2)
compute the result on the RDD and return it to the driver program or save it to external storage. Although we can define RDDs at any point, Spark computes them in lazy fashion, that is, the first time it is used in any action.
There's more…
There are a few transformations, such as reduceByKey
, groupByKey
, repartition
, distinct
, intersection
, subtract
, and so on, which result in shuffle operation. This shuffle is very expensive as it involves disk I/O, data serialization, and network I/O. Using certain configuration parameters, shuffle can be optimized.
See also
The Apache Spark documentation offers a detailed explanation about the Spark programming model. Please refer to this documentation page: http://spark.apache.org/docs/latest/programming-guide.html.