Now, let's get hands-on with Spark so that we can go deeper into the core APIs and libraries. In all of the chapters of this book, I will be referring to the 2.2.1 release of Spark, however, several examples that are presented here should work with the 2.0 release or later. I will put a note when an example is specifically for 2.2+ releases only.
First of all, you need to download Spark from its official website (https://spark.apache.org/downloads.html). The download page should look like this:
You need to have JDK 1.8+ and Python 2.7+ or 3.4+ (only if you need to develop using this language). Spark 2.2.1 supports Scala 2.11. The JDK needs to be present on your user path system variable, though, alternatively, you could have your user JAVA_HOME environment variable pointing to a JDK installation.
Extract the content of the downloaded archive to any local directory. Move to the $SPARK_HOME/bin directory. There, among the other executables, you will find the interactive Spark shells for Scala and Python. They are the best way to get familiar with this framework. In this chapter, I am going to present examples that you can run through these shells.
You can run a Scala shell using the following command:
$SPARK_HOME/bin/spark-shell.sh
If you don't specify an argument, Spark assumes that you're running locally in standalone mode. Here's the expected output to the console:
Spark context Web UI available at http://10.72.0.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1518131682342).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
The web UI is available at the following URL: http://<host>:4040.
It will give you the following output:
From there, you can check the status of your jobs and executors.
From the output of the console startup, you will notice that two built-in variables, sc and spark, are available. sc represents the SparkContext (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext), which in Spark < 2.0 was the entry point for each application. Through the Spark context (and its specializations), you can get input data from data sources, create and manipulate RDDs (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), and attain the Spark primary abstraction before 2.0. The RDD programming section will cover this topic and other operations in more detail. Starting from release 2.0, a new entry point, SparkSession (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession), and a new main data abstraction, the Dataset (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), were introduced. More details on them are presented in the following sections. The SparkContext is still part of the Spark API so that compatibility with existing frameworks not supporting Spark sessions is ensured, but the direction the project has taken is to move development to use the SparkSession.
Here's an example of how to read and manipulate a text file and put it into a Dataset using the Spark shell (the file used in this example is part of the resources for the examples that are bundled with the Spark distribution):
scala> spark.read.textFile("/usr/spark-2.2.1/examples/src/main/resources/people.txt")
res5: org.apache.spark.sql.Dataset[String] = [value: string]
The result is a Dataset instance that contains the file lines. You can then make several operations on this Dataset, such as counting the number of lines:
scala> res5.count()
res6: Long = 3
You can also get the first line of the Dataset:
scala> res5.first()
res7: String = Michael, 29
In this example, we used a path on the local filesystem. In these cases, the file should be accessible from the same path by all of the workers, so you will need to copy the file across all workers or use a network-mounted shared filesystem.
To close a shell, you can type the following:
:quit
To see the list of all of the available shell commands, type the following:
scala> :help
All commands can be abbreviated, for example, :he instead of :help.
The following is the list of commands:
Commands | Purpose |
:edit <id>|<line> | Edit history |
:help [command] | Prints summary or command-specific help |
:history [num] | Shows history (optional num is commands to show) |
:h? <string> | Search history |
:imports [name name ...] | Show import history, identifying the sources of names |
:implicits [-v] | Show the implicits in scope |
:javap <path|class> | Disassemble a file or class name |
:line <id>|<line> | Place line(s) at the end of history |
:load <path> | Interpret lines in a file |
:paste [-raw] [path] | Enter paste mode or paste a file |
:power | Enable power user mode |
:quit | Exit the interpreter |
:replay [options] | Reset the repl and replay on all previous commands |
:require <path> | Add a jar to the classpath |
:reset [options] | Reset the repl to its initial state, forgetting all session entries |
:save <path> | Save the replayable session to a file |
:sh <command line> | Run a shell command (the result is implicitly => List[String]) |
:settings <options> | Update compiler options, if possible; see reset |
:silent | Disable or enable the automatic printing of results |
:type [-v] <expr> | Display the type of expression without evaluating it |
:kind [-v] <expr> | Display the kind of expression |
:warnings | Show the suppressed warnings from the most recent line that had any |
Like Scala, an interactive shell is available for Python. You can run it using the following command:
$SPARK_HOME/bin/pyspark.sh
A built-in variable named spark representing the SparkSession is available. You can do the same things as for the Scala shell:
>>> textFileDf = spark.read.text("/usr/spark-2.2.1/examples/src/main/resources/people.txt")
>>> textFileDf.count()
3
>>> textFileDf.first()
Row(value='Michael, 29')
Unlike Java and Scala, Python is more dynamic and is not strongly typed. Therefore, a DataSet in Python is a DataSet[Row], but you can call it a DataFrame so that it's consistent with the DataFrame concept of the Pandas framework (https://pandas.pydata.org/).
To close a Python shell, you can type the following:
quit()
Interactive shells aren't the only choice for running code in Spark. It is also possible to implement self-contained applications. Here's an example of reading and manipulating a file in Scala:
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "/usr/spark-2.2.1/examples/src/main/resources/people.txt"
val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
Applications should define a main() method instead of extending scala.App. Note the code to create SparkSession:
val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()
It follows the builder factory design pattern.
Always explicitly close the session before ending the program execution:
spark.stop()
To build the application, you can use a build tool of your choice (Maven, sbt, or Gradle), adding the dependencies from Spark 2.2.1 and Scala 2.11. Once a JAR file has been generated, you can use the $SPARK_HOME/bin/spark-submit command to execute it, specifying the JAR filename, the Spark master URL, and a list of optional parameters, including the job name, the main class, the maximum memory to be used by each executor, and many others.
The same self-contained application could have been implemented in Python as well:
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder().appName(appName).master(master).getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()
This can be saved in a .py file and submitted through the same $SPARK_HOME/bin/spark-submit command for execution.