We will now use the ideas we introduced in the previous section to write a basic Spark program to manipulate a dataset. We will start with Scala and then write the same program in Java and Python. Our program will be based on exploring some data from an online store, about which users have purchased which products. The data is contained in a Comma-Separated-Value (CSV) file called UserPurchaseHistory.csv. This file is expected to be in the data directory.
The contents are shown in the following snippet. The first column of the CSV is the username, the second column is the product name, and the final column is the price:
John,iPhone Cover,9.99
John,Headphones,5.49
Jack,iPhone Cover,9.99
Jill,Samsung Galaxy Cover,8.95
Bob,iPad Cover,5.49
For our Scala program, we need to create two files-our Scala code and our project build configuration file-using the build tool Scala Build Tool (SBT). For ease of use, we recommend that you use -spark-app for this chapter. This code also contains the CSV file under the data directory. You will need SBT installed on your system in order to run this example program (we use version 0.13.8 at the time of writing this book).
Our SBT configuration file, build.sbt, looks like this (note that the empty lines between each line of code are required):
name := "scala-spark-app"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"
The last line adds the dependency on Spark to our project.
Our Scala program is contained in the ScalaApp.scala file. We will walk through the program piece by piece. First, we need to import the required Spark classes:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
/**
* A simple Spark app in Scala
*/
object ScalaApp {
In our main method, we need to initialize our SparkContext object and use this to access our CSV data file with the textFile method. We will then map the raw text by splitting the string on the delimiter character (a comma in this case) and extracting the relevant records for username, product, and price:
def main(args: Array[String]) {
val sc = new SparkContext("local[2]", "First Spark App")
// we take the raw data in CSV format and convert it into a
set of records of the form (user, product, price)
val data = sc.textFile("data/UserPurchaseHistory.csv")
.map(line => line.split(","))
.map(purchaseRecord => (purchaseRecord(0),
purchaseRecord(1), purchaseRecord(2)))
Now that we have an RDD, where each record is made up of (user, product, price), we can compute various interesting metrics for our store, such as the following ones:
- The total number of purchases
- The number of unique users who purchased
- Our total revenue
- Our most popular product
Let's compute the preceding metrics:
// let's count the number of purchases
val numPurchases = data.count()
// let's count how many unique users made purchases
val uniqueUsers = data.map{ case (user, product, price) => user
}.distinct().count()
// let's sum up our total revenue
val totalRevenue = data.map{ case (user, product, price) =>
price.toDouble }.sum()
// let's find our most popular product
val productsByPopularity = data
.map{ case (user, product, price) => (product, 1) }
.reduceByKey(_ + _)
.collect()
.sortBy(-_._2)
val mostPopular = productsByPopularity(0)
This last piece of code to compute the most popular product is an example of the Map/Reduce pattern made popular by Hadoop. First, we mapped our records of (user, product, price) to the records of (product, 1). Then, we performed a reduceByKey operation, where we summed up the 1s for each unique product.
Once we have this transformed RDD, which contains the number of purchases for each product, we will call collect, which returns the results of the computation to the driver program as a local Scala collection. We will then sort these counts locally (note that in practice, if the amount of data is large, we will perform the sorting in parallel, usually with a Spark operation such as sortByKey).
Finally, we will print out the results of our computations to the console:
println("Total purchases: " + numPurchases)
println("Unique users: " + uniqueUsers)
println("Total revenue: " + totalRevenue)
println("Most popular product: %s with %d
purchases".format(mostPopular._1, mostPopular._2))
}
}
We can run this program by running sbt run in the project's base directory or by running the program in your Scala IDE if you are using one. The output should look similar to the following:
...
[info] Compiling 1 Scala source to ...
[info] Running ScalaApp
...
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases
We can see that we have 5 purchases from four different users with total revenue of 39.91. Our most popular product is an iPhone cover with 2 purchases.