The Java API is very similar in principle to the Scala API. However, while Scala can call the Java code quite easily, in some cases, it is not possible to call the Scala code from Java. This is particularly the case when Scala code makes use of Scala features such as implicit conversions, default parameters, and the Scala reflection API.
Spark makes heavy use of these features in general, so it is necessary to have a separate API specifically for Java that includes Java versions of the common classes. Hence, SparkContext becomes JavaSparkContext and RDD becomes JavaRDD.
Java versions prior to version 8 do not support anonymous functions and do not have succinct syntax for functional-style programming, so functions in the Spark Java API must implement a WrappedFunction interface with the call method signature. While it is significantly more verbose, we will often create one-off anonymous classes to pass to our Spark operations, which implement this interface and the call method to achieve much the same effect as anonymous functions in Scala.
Spark provides support for Java 8's anonymous function (or lambda) syntax. Using this syntax makes a Spark program written in Java 8 look very close to the equivalent Scala program.
In Scala, an RDD of key/value pairs provides special operators (such as reduceByKey and saveAsSequenceFile, for example) that are accessed automatically via implicit conversions. In Java, special types of JavaRDD classes are required in order to access similar functions. These include JavaPairRDD to work with key/value pairs and JavaDoubleRDD to work with numerical records.
We will see examples of most of these differences in the following Java program, which is included in the example code of this chapter in the directory named java-spark-app. The code directory also contains the CSV data file under the data subdirectory.
We will build and run this project with the Maven build tool, which we assume you have installed on your system.
Detailed installation instructions can be found at http://maven.apache.org/download.cgi.
The project contains a Java file called JavaApp.java, which contains our program code:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
import java.util.*;
import java.util.stream.Collectors;
/**
* A simple Spark app in Java
*/
public class JavaApp {
public static void main(String[] args) {
As in our Scala example, we first need to initialize our context. Note that we will use the JavaSparkContext class here instead of the SparkContext class that we used earlier. We will use the JavaSparkContext class in the same way to access our data using textFile and then split each row into the required fields. Note how we used an anonymous class to define a split function that performs the string processing in the highlighted code:
JavaSparkContext sc = new JavaSparkContext("local[2]",
"First Spark App");
// we take the raw data in CSV format and convert it into a
// set of records of the form (user, product, price)
JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(","));
Now, we can compute the same metrics as we did in our Scala example. Note how some methods are the same (for example, distinct and count) for the Java and Scala APIs. Also note the use of anonymous classes that we pass to the map function. This code is highlighted here:
// let's count the number of purchases
long numPurchases = data.count();
// let's count how many unique users made purchases
long uniqueUsers = data.map(strings ->
strings[0]).distinct().count();
// let's sum up our total revenue
Double totalRevenue = data.map(strings ->
Double.parseDouble(strings[2])).reduce((Double v1,
Double v2) -> new Double(v1.doubleValue() + v2.doubleValue()));
In the following lines of code, we can see that the approach to compute the most popular product is the same as that in the Scala example. The extra code might seem complex, but it is mostly related to the Java code required to create the anonymous functions (which we have highlighted here). The actual functionality is the same:
// let's find our most popular product
List<Tuple2<String, Integer>> pairs = data.mapToPair(strings -> new Tuple2<String, Integer>(strings[1], 1)).reduceByKey((Integer i1, Integer i2) -> i1 + i2).collect();
Map<String, Integer> sortedData = new HashMap<>();
Iterator it = pairs.iterator();
while (it.hasNext()) {
Tuple2<String, Integer> o = (Tuple2<String, Integer>) it.next();
sortedData.put(o._1, o._2);
}
List<String> sorted = sortedData.entrySet()
.stream()
.sorted(Comparator.comparing((Map.Entry<String, Integer>
entry) -> entry.getValue()).reversed())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
String mostPopular = sorted.get(0);
int purchases = sortedData.get(mostPopular);
System.out.println("Total purchases: " + numPurchases);
System.out.println("Unique users: " + uniqueUsers);
System.out.println("Total revenue: " + totalRevenue);
System.out.println(String.format("Most popular product:
%s with %d purchases", mostPopular, purchases));
}
}
As can be seen, the general structure is similar to the Scala version, apart from the extra boilerplate code used to declare variables and functions via anonymous inner classes. It is a good exercise to work through both examples and compare lines of Scala code to those in Java to understand how the same result is achieved in each language.
This program can be run with the following command executed from the project's base directory:
$ mvn exec:java -Dexec.mainClass="JavaApp"
You will see output that looks very similar to the Scala version with identical results of the computation:
...
14/01/30 17:02:43 INFO spark.SparkContext: Job finished: collect
at JavaApp.java:46, took 0.039167 s
Total purchases: 5
Unique users: 4
Total revenue: 39.91
Most popular product: iPhone Cover with 2 purchases