Working with pair RDDs
This recipe shows how to work with RDDs of key/value pairs. Key/value RDDs are often widely used to perform aggregations. These key/value RDDs are called pair RDDs. We'll do some initial ETL to get the data into a key/value format and see how to apply transformations on single-pair RDDs and two-pair RDDs.
Getting ready
To step through this recipe, you will need a running Spark cluster either in pseudo distributed mode or in one of the other distributed modes, that is, standalone, YARN, or Mesos. It could be run in local mode as well.
How to do it…
- We can create a pair RDD from a collection of strings in the following way:
val baseRdd = sc.parallelize(Array("this,is,a,ball","it,is,a,cat","john,is, in,town,hall")) val inputRdd = sc.makeRDD(List(("is",2), ("it",2), ("cat",8 ("this",6),("john",5),("a",1))) val wordsRdd = baseRdd.flatMap(record => record.split(",")) val wordPairs = wordsRdd.map(word => (word, word.length)) val filteredWordPairs = wordPairs.filter{case(word, length) => length >=2}
- Also, pair RDDs can be created from the
hdfs
input files. Let's take a text file which contains stocks data as follows:IBM,20160113,133.5,134.279999,131.100006,131.169998,4672300 GOOG,20160113,730.849976,734.73999,698.609985,700.559998, 2468300 MSFT,20160113,53.799999,54.07,51.299999,51.639999,66119000 MSFT,20160112,52.759998,53.099998,52.060001,52.779999,35650700 YHOO,20160113,30.889999,31.17,29.33,29.440001,16593700
- Now, creating pair RDDs for the preceding data looks like this:
val textFile = sc.textFile("hdfs://namenodeHostName:8020 /data/stocks.txt") val stocksPairRdd = textFile.map{record => val colData = record.split(",") (colData(0),colData(6))}
- Let's apply transformations on pair RDDs as follows:
val stocksGroupedRdd = stocksPairRdd.groupByKey val stocksReducedRdd = stocksPairRdd.reduceByKey((x,y)=>x+y) val subtractedRdd = wordPairs.subtractByKey(inputRdd) val cogroupedRdd = wordPairs.cogroup(inputRdd) val joinedRdd = filteredWordPairs.join(inputRdd) val sortedRdd = wordPairs.sortByKey val leftOuterJoinRdd = inputRdd.leftOuterJoin(filteredWordPairs) val rightOuterJoinRdd = wordPairs.rightOuterJoin(inputRdd) val flatMapValuesRdd = filteredWordPairs.flatMapValues(length => 1 to 5) val mapValuesRdd = wordPairs.mapValues(length => length*2) val keys = wordPairs.keys val values = filteredWordPairs.values
How it works…
The usage of various pair RDD transformations is given as follows:
groupByKey
groups the values of the RDD by key.reduceByKey
performs aggregation on the grouped values corresponding to a key.subtractByKey
removes tuples in the first RDD whose key matches with the other RDD.join
groups all the values pertaining to a particular key in both the RDDs.cogroup
does the same job asjoin
but in addition it first groups the values in the first RDD and then in the other RDD.leftOuterJoin
andrightOuterJoin
work similarly to join with a slight variation that is,leftOuterJoin
includes all the records from left RDD and if there is no matching record found in the right RDD, the corresponding values are represented as none and vice versa forrightOuterJoin
.mapValues
transformation applies a function to each of the values of the pair RDD without changing the key.- The functioning of
flatMapValues
is typical. It applies the function which returns an iterator to each value of a pair RDD, and for each element returned, a key/value entry is produced with the old key. keys
andvalues
transformations return respectively all keys and all values of a pair RDD.
There's more…
There are other pair RDD transformations, such as, foldByKey
, combineByKey
, and aggregateByKey
, and actions such as countByKey
and countByValue
along with the available regular actions such as count
, first
, take
, and so on. Any pair RDD transformation would involve a shuffle operation which shuffles the data across the partitions. To know more about the working of the shuffle operation and its performance impact, please refer to http://spark.apache.org/docs/latest/programming-guide.html#working-with-key-value-pairs.
See also
The Working with the Spark programming model and Working with Spark's Python and Scala shells recipes explain how to work with RDDs and how to make use of Spark shell for testing the application logic.