SparkR is an R package which provides a frontend to use Apache Spark from R. In Spark 1.6.0; SparkR provides a distributed data frame on large datasets. SparkR also supports distributed machine learning using MLlib. This is something you should try out while reading machine learning chapters.
The first step to a Spark program in R
SparkR DataFrames
DataFrame is a collection of data organized into names columns that are distributed. This concept is very similar to a relational database or a data frame of R but with much better optimizations. Source of these data frames could be a CSV, a TSV, Hive tables, local R data frames, and so on.
Spark distribution can be run using the ./bin/sparkR shell.
Following on from the preceding examples, we will now write an R version. We assume that you have R (R version 3.0.2 (2013-09-25)-Frisbee Sailing), R Studio and higher installed on your system (for example, most Linux and Mac OS X systems come with Python preinstalled).
The example program is included in the sample code for this chapter, in the directory named r-spark-app, which also contains the CSV data file under the data subdirectory. The project contains a script, r-script-01.R, which is provided in the following. Make sure you change PATH to appropriate value for your environment.
Sys.setenv(SPARK_HOME = "/PATH/spark-2.0.0-bin-hadoop2.7")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"),
.libPaths()))
#load the Sparkr library
library(SparkR)
sc <- sparkR.init(master = "local", sparkPackages="com.databricks:spark-csv_2.10:1.3.0")
sqlContext <- sparkRSQL.init(sc)
user.purchase.history <- "/PATH/ml-resources/spark-ml/Chapter_01/r-spark-app/data/UserPurchaseHistory.csv"
data <- read.df(sqlContext, user.purchase.history, "com.databricks.spark.csv", header="false")
head(data)
count(data)
parseFields <- function(record) {
Sys.setlocale("LC_ALL", "C") # necessary for strsplit() to work correctly
parts <- strsplit(as.character(record), ",")
list(name=parts[1], product=parts[2], price=parts[3])
}
parsedRDD <- SparkR:::lapply(data, parseFields)
cache(parsedRDD)
numPurchases <- count(parsedRDD)
sprintf("Number of Purchases : %d", numPurchases)
getName <- function(record){
record[1]
}
getPrice <- function(record){
record[3]
}
nameRDD <- SparkR:::lapply(parsedRDD, getName)
nameRDD = collect(nameRDD)
head(nameRDD)
uniqueUsers <- unique(nameRDD)
head(uniqueUsers)
priceRDD <- SparkR:::lapply(parsedRDD, function(x) { as.numeric(x$price[1])})
take(priceRDD,3)
totalRevenue <- SparkR:::reduce(priceRDD, "+")
sprintf("Total Revenue : %.2f", s)
products <- SparkR:::lapply(parsedRDD, function(x) { list( toString(x$product[1]), 1) })
take(products, 5)
productCount <- SparkR:::reduceByKey(products, "+", 2L)
productsCountAsKey <- SparkR:::lapply(productCount, function(x) { list( as.integer(x[2][1]), x[1][1])})
productCount <- count(productsCountAsKey)
mostPopular <- toString(collect(productsCountAsKey)[[productCount]][[2]])
sprintf("Most Popular Product : %s", mostPopular)
Run the script with the following command on the bash terminal:
$ Rscript r-script-01.R
Your output will be similar to the following listing:
> sprintf("Number of Purchases : %d", numPurchases)
[1] "Number of Purchases : 5"
> uniqueUsers <- unique(nameRDD)
> head(uniqueUsers)
[[1]]
[[1]]$name
[[1]]$name[[1]]
[1] "John"
[[2]]
[[2]]$name
[[2]]$name[[1]]
[1] "Jack"
[[3]]
[[3]]$name
[[3]]$name[[1]]
[1] "Jill"
[[4]]
[[4]]$name
[[4]]$name[[1]]
[1] "Bob"
> sprintf("Total Revenue : %.2f", totalRevenueNum)
[1] "Total Revenue : 39.91"
> sprintf("Most Popular Product : %s", mostPopular)
[1] "Most Popular Product : iPad Cover"