In general, every Spark application is a driver program that runs the logic that has been implemented for it and executes parallel operations on a cluster. In accordance with the previous definition, the main abstraction provided by the core Spark framework is the RDD. It is an immutable distributed collection of data that is partitioned across machines in a cluster. Operations on RDDs can happen in parallel.
Two types of operations are available on an RDD:
A transformation is an operation on an RDD that produces another RDD, while an action is an operation that triggers some computation and then returns a value to the master or can be persisted to a storage system. Transformations are lazy—they aren't executed until an action is invoked. Here's the strength point of Spark—Spark masters and their drivers both remember the transformations that have been applied to an RDD, so if a partition is lost (for example, a slave goes down), it can be easily rebuilt on some other node of the cluster.
The following table lists some of the common transformations supported by Spark:
Transformation |
Purpose |
map(func) |
Returns a new RDD by applying the func function on each data element of the source RDD. |
filter(func) |
Returns a new RDD by selecting those data elements for which the applied func function returns true. |
flatMap(func) |
This transformation is similar to map: the difference is that each input item can be mapped to zero or multiple output items (the applied func function should return a Seq). |
union(otherRdd) |
Returns a new RDD that contains the union of the elements in the source RDD and the otherRdd argument. |
distinct([numPartitions]) |
Returns a new RDD that contains only the distinct elements of the source RDD. |
groupByKey([numPartiotions]) |
When called on an RDD of (K, V) pairs, it returns an RDD of (K, Iterable<V>) pairs. By default, the level of parallelism in the output RDD depends on the number of partitions of the source RDD. You can pass an optional numPartitions argument to set a different number of partitions. |
reduceByKey(func, [numPartitions]) |
When called on an RDD of (K, V) pairs, it returns an RDD of (K, V) pairs, where the values for each key are aggregated using the given reduce func function, which must be of type (V,V) => V. The same as for the groupByKey transformation, the number of reduce partitions is configurable through an optional numPartitions second argument.
|
sortByKey([ascending], [numPartitions]) |
When called on an RDD of (K, V) pairs, it returns an RDD of (K, V) pairs sorted by keys in ascending or descending order, as specified in the Boolean ascending argument. The number of partitions for the output RDD is configurable through an optional numPartitions second argument. |
join(otherRdd, [numPartitions]) |
When called on RDDs of type (K, V) and (K, W), it returns an RDD of (K, (V, W)) pairs with all pairs of elements for each key. It supports left outer join, right outer join, and full outer join. The number of partitions for the output RDD is configurable through an optional numPartitions second argument. |
The following table lists some of the common actions supported by Spark:
Action |
Purpose |
reduce(func) |
Aggregates the elements of an RDD using a given function, func (this takes two arguments and returns one). To ensure the correct parallelism at compute time, the reduce function, func, has to be commutative and associative. |
collect() |
Returns all the elements of an RDD as an array to the driver. |
count() |
Returns the total number of elements in an RDD. |
first() |
Returns the first element of an RDD. |
take(n) |
Returns an array containing the first n elements of an RDD. |
foreach(func) |
Executes the func function on each element of an RDD. |
saveAsTextFile(path) |
Writes the elements of an RDD as a text file in a given directory (with the absolute location specified through the path argument) in the local filesystem, HDFS, or any other Hadoop-supported filesystem. This is available for Scala and Java only. |
countByKey() |
This action is only available on RDDs of type (K, V) – it returns a hashmap of (K, Int) pairs, where K is a key of the source RDD and its value is the count for that given key, K. |
Now, let's understand the concepts of transformation and action through an example that could be executed in the Scala shell—this finds the N most commonly used words in an input text file. The following diagram depicts a potential implementation for this problem:
Figure 1.6
Let's translate this into code.
First of all, let's load the content of a text file into an RDD of strings:
scala> val spiderman = sc.textFile("/usr/spark-2.2.1/tests/spiderman.txt")
spiderman: org.apache.spark.rdd.RDD[String] = /usr/spark-2.2.1/tests/spiderman.txt MapPartitionsRDD[1] at textFile at <console>:24
Then, we will apply the necessary transformations and actions:
scala> val topWordCount = spiderman.flatMap(str=>str.split(" ")).filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case(word, count) => (count, word)}.sortByKey(false)
topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[9] at sortByKey at <console>:26
Here, we have the following:
- flatMap(str=>str.split(" ")): Splits each line into single words
- filter(!_.isEmpty): Removes empty strings
- map(word=>(word,1)): Maps each word into a key-value pair
- reduceByKey(_+_): Aggregates the count
- map{case(word, count) => (count, word)}: Reverses the (word, count) pairs to (count, word)
- sortByKey(false): Sorts by descending order
Finally, print the five most used words in the input content to the console:
scala> topWordCount.take(5).foreach(x=>println(x))
(34,the)
(28,and)
(19,of)
(19,in)
(16,Spider-Man)
The same could be achieved in Python in the following way:
from operator import add
spiderman = spark.read.text("/usr/spark-2.2.1/tests/spiderman.txt")
lines = spiderman.rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: (x[1],x[0])) \
.sortByKey(False)
The result, of course, is the same as for the Scala example:
>> counts.take(5)
[(34, 'the'), (28, 'and'), (19, 'in'), (19, 'of'), (16, 'Spider-Man')]
Spark can persist RDDs (and Datasets as well) in memory while executing operations on them. Persisting and caching are synonyms in Spark. When persisting an RDD, each node of the cluster stores the RDD partitions that it needs to compute in memory and reuses them in further actions on the same RDD (or RDDs that have been derived from it through some transformations). This is the reason why future actions execute much faster. It is possible to mark an RDD to be persisted using its persist() method. The first time an action is executed on it, it will be kept in memory on the cluster's nodes. The Spark cache is fault-tolerant—this means that, if for any reason all of the partitions of an RDD are lost, it will be automatically recalculated using the transformations that created it. A persisted RDD can be stored using different storage levels. Levels can be set by passing a StorageLevel object to the persist() method of the RDD. The following table lists all of the available storage levels and their meanings:
Storage Level |
Purpose |
MEMORY_ONLY |
This is the default storage level. It stores RDDs as deserialized Java objects in memory. In those cases where an RDD shouldn't fit in memory, some of its partitions won't be cached and will be recalculated on the fly when needed. |
MEMORY_AND_DISK |
It stores RDDs as deserialized Java objects in memory first, but, in those cases where an RDD shouldn't fit in memory, it stores some partitions on disk (this is the main difference between MEMORY_ONLY), and reads them from there when needed. |
MEMORY_ONLY_SER |
It stores RDDs as serialized Java objects. Compared to MEMORY_ONLY, this is more space-efficient, but more CPU-intensive in read operations. This is available for JVM languages only. |
MEMORY_AND_DISK_SER |
Is similar to MEMORY_ONLY_SER (it stores RDDs as serialized Java objects), with the main difference being that it stores partitions that don't fit in memory to disk. This is available only for JVM languages. |
DISK_ONLY |
It stores the RDD partitions on disk only. |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, and so on |
The same as the two preceding levels (MEMORY_ONLY and MEMORY_AND_DISK), but each partition is replicated on two cluster nodes. |
OFF_HEAP |
Similar to MEMORY_ONLY_SER, but it stores data in off-heap memory (assuming off-heap memory is enabled). Please be careful when using this storage level as it is still experimental. |
When a function is passed to a Spark operation, it is executed on a remote cluster node that will work on separate copies of all the variables that are used in the function. Once done, the variables will be copied to each machine. There will be no updates to the variables on the remote machine when propagated back to the driver program. It would be inefficient to support general, read-write shared variables across tasks.
However, there are two limited types of shared variables that are available in Spark for two common usage patterns – broadcast variables and accumulators.
One of the most common operations in Spark programming is to perform joins on RDDs to consolidate data by a given key. In these cases, it is quite possible to have large Datasets sent around to slave nodes that host the partitions to be joined. You can easily understand that this situation presents a huge performance bottleneck, as network I/O is 100 times slower than RAM access. To mitigate this issue, Spark provides broadcast variables, which are broadcast to slave nodes. RDD operations on the nodes can quickly access the broadcast variable value. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication costs. Broadcast variables are created from a variable, v, by calling the SparkContext.broadcast(v) method. The broadcast variable is a wrapper around v, and its value can be obtained by calling the value method. Here's an example in Scala that you can run through the Spark shell:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After its creation, the broadcast variable, broadcastVar, can be used in any function that's executed on the cluster, but not the initial value, v, as this prevents v being shipped to all the nodes more than once. To ensure that all the nodes get the same value of the broadcast variable, v must not be modified after broadcastVar has been broadcast.
Here's the code for the same example in Python:
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
To aggregate information across executors in a Spark cluster, accumulator variables should be used. The fact that they are added through an associative and commutative operation ensures their efficient support in parallel computation. Spark natively provides support for the accumulators of numeric types—they can be created by calling SparkContext.longAccumulator() (to accumulate values of type Long) or SparkContext.doubleAccumulator() (to accumulate values of type Double) methods.
However, it is possible to programmatically add support for other types. Any task running on a cluster can add to an accumulator using the add method, but they cannot read its value – this operation is only allowed for the driver program, which uses its value method. Here's a code example in Scala:
scala> val accum = sc.longAccumulator("First Long Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some
(First Long Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
[Stage 0:> (0 + 0) / 8]
scala> accum.value
res1: Long = 10
In this case, an accumulator has been created, and has assigned a name to it. It is possible to create unnamed accumulators, but a named accumulator will display in the web UI for the stage that modifies that accumulator:
Figure 1.7
This can be helpful for understanding the progress of running stages.
The same example in Python is as follows:
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
>>> accum.value
10
Tracking accumulators in the web UI isn't supported for Python.
Please be aware that Spark guarantees to update accumulators inside actions only. When restarting a task, the accumulators will be updated only once. The same isn't true for transformations.