Search icon CANCEL
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon
Hands-On Deep Learning with Apache Spark
Hands-On Deep Learning with Apache Spark

Hands-On Deep Learning with Apache Spark: Build and deploy distributed deep learning applications on Apache Spark

eBook
$24.99 $35.99
Paperback
$48.99
Subscription
Free Trial
Renews at $19.99p/m

What do you get with Print?

Product feature icon Instant access to your digital eBook copy whilst your Print order is Shipped
Product feature icon Paperback book shipped to your preferred address
Product feature icon Download this book in EPUB and PDF formats
Product feature icon Access this title in our online reader with advanced features
Product feature icon DRM FREE - Read whenever, wherever and however you want
Product feature icon AI Assistant (beta) to help accelerate your learning
Table of content icon View table of contents Preview book icon Preview Book

Hands-On Deep Learning with Apache Spark

The Apache Spark Ecosystem

Apache Spark (http://spark.apache.org/) is an open source, fast cluster-computing platform. It was originally created by AMPLab at the University of California, Berkeley. Its source code was later donated to the Apache Software Foundation (https://www.apache.org/). Spark comes with a very fast computation speed because data is loaded into distributed memory (RAM) across a cluster of machines. Not only can data be quickly transformed, but also cached on demand for a variety of use cases. Compared to Hadoop MapReduce, it runs programs up to 100 times faster when the data fits in memory, or 10 times faster on disk. Spark provides support for four programming languages: Java, Scala, Python, and R. This book covers the Spark APIs (and deep learning frameworks) for Scala (https://www.scala-lang.org/) and Python (https://www.python.org/) only.

This chapter will cover the following topics:

  • Apache Spark fundamentals
  • Getting Spark
  • Resilient Distributed Dataset (RDD) programming
  • Spark SQL, Datasets, and DataFrames
  • Spark Streaming
  • Cluster mode using a different manager

Apache Spark fundamentals

This section covers the Apache Spark fundamentals. It is important to become very familiar with the concepts that are presented here before moving on to the next chapters, where we'll be exploring the available APIs.

As mentioned in the introduction to this chapter, the Spark engine processes data in distributed memory across the nodes of a cluster. The following diagram shows the logical structure of how a typical Spark job processes information:

Figure 1.1

Spark executes a job in the following way:

Figure 1.2

The Master controls how data is partitioned and takes advantage of data locality while keeping track of all the distributed data computation on the Slave machines. If a certain Slave machine becomes unavailable, the data on that machine is reconstructed on another available machine(s). In standalone mode, the Master is a single point of failure. This chapter's Cluster mode using different managers section covers the possible running modes and explains fault tolerance in Spark.

Spark comes with five major components:

Figure 1.3

These components are as follows:

  • The core engine.
  • Spark SQL: A module for structured data processing.
  • Spark Streaming: This extends the core Spark API. It allows live data stream processing. Its strengths include scalability, high throughput, and fault tolerance.
  • MLib: The Spark machine learning library.
  • GraphX: Graphs and graph-parallel computation algorithms.

Spark can access data that's stored in different systems, such as HDFS, Cassandra, MongoDB, relational databases, and also cloud storage services such as Amazon S3 and Azure Data Lake Storage.

Getting Spark

Now, let's get hands-on with Spark so that we can go deeper into the core APIs and libraries. In all of the chapters of this book, I will be referring to the 2.2.1 release of Spark, however, several examples that are presented here should work with the 2.0 release or later. I will put a note when an example is specifically for 2.2+ releases only.

First of all, you need to download Spark from its official website (https://spark.apache.org/downloads.html). The download page should look like this:

Figure 1.4

You need to have JDK 1.8+ and Python 2.7+ or 3.4+ (only if you need to develop using this language). Spark 2.2.1 supports Scala 2.11. The JDK needs to be present on your user path system variable, though, alternatively, you could have your user JAVA_HOME environment variable pointing to a JDK installation.

Extract the content of the downloaded archive to any local directory. Move to the $SPARK_HOME/bin directory. There, among the other executables, you will find the interactive Spark shells for Scala and Python. They are the best way to get familiar with this framework. In this chapter, I am going to present examples that you can run through these shells.

You can run a Scala shell using the following command:

$SPARK_HOME/bin/spark-shell.sh

If you don't specify an argument, Spark assumes that you're running locally in standalone mode. Here's the expected output to the console:

Spark context Web UI available at http://10.72.0.2:4040
Spark context available as 'sc' (master = local[*], app id = local-1518131682342).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.2.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

The web UI is available at the following URL: http://<host>:4040.

It will give you the following output:

Figure 1.5

From there, you can check the status of your jobs and executors.

From the output of the console startup, you will notice that two built-in variables, sc and spark, are available. sc represents the SparkContext (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext), which in Spark < 2.0 was the entry point for each application. Through the Spark context (and its specializations), you can get input data from data sources, create and manipulate RDDs (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD), and attain the Spark primary abstraction before 2.0. The RDD programming section will cover this topic and other operations in more detail. Starting from release 2.0, a new entry point, SparkSession (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SparkSession), and a new main data abstraction, the Dataset (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset), were introduced. More details on them are presented in the following sections. The SparkContext is still part of the Spark API so that compatibility with existing frameworks not supporting Spark sessions is ensured, but the direction the project has taken is to move development to use the SparkSession.

Here's an example of how to read and manipulate a text file and put it into a Dataset using the Spark shell (the file used in this example is part of the resources for the examples that are bundled with the Spark distribution):

scala> spark.read.textFile("/usr/spark-2.2.1/examples/src/main/resources/people.txt")
res5: org.apache.spark.sql.Dataset[String] = [value: string]

The result is a Dataset instance that contains the file lines. You can then make several operations on this Dataset, such as counting the number of lines:

scala> res5.count()
res6: Long = 3

You can also get the first line of the Dataset:

scala> res5.first()
res7: String = Michael, 29

In this example, we used a path on the local filesystem. In these cases, the file should be accessible from the same path by all of the workers, so you will need to copy the file across all workers or use a network-mounted shared filesystem.

To close a shell, you can type the following:

:quit

To see the list of all of the available shell commands, type the following:

scala> :help

All commands can be abbreviated, for example, :he instead of :help.

The following is the list of commands:

Commands Purpose
:edit <id>|<line> Edit history
:help [command] Prints summary or command-specific help
:history [num] Shows history (optional num is commands to show)
:h? <string> Search history
:imports [name name ...] Show import history, identifying the sources of names
:implicits [-v] Show the implicits in scope
:javap <path|class> Disassemble a file or class name
:line <id>|<line> Place line(s) at the end of history
:load <path> Interpret lines in a file
:paste [-raw] [path] Enter paste mode or paste a file
:power Enable power user mode
:quit Exit the interpreter
:replay [options] Reset the repl and replay on all previous commands
:require <path> Add a jar to the classpath
:reset [options] Reset the repl to its initial state, forgetting all session entries
:save <path> Save the replayable session to a file
:sh <command line> Run a shell command (the result is implicitly => List[String])
:settings <options> Update compiler options, if possible; see reset
:silent Disable or enable the automatic printing of results
:type [-v] <expr> Display the type of expression without evaluating it
:kind [-v] <expr> Display the kind of expression
:warnings Show the suppressed warnings from the most recent line that had any

Like Scala, an interactive shell is available for Python. You can run it using the following command:

$SPARK_HOME/bin/pyspark.sh

A built-in variable named spark representing the SparkSession is available. You can do the same things as for the Scala shell:

>>> textFileDf = spark.read.text("/usr/spark-2.2.1/examples/src/main/resources/people.txt")
>>> textFileDf.count()
3
>>> textFileDf.first()
Row(value='Michael, 29')

Unlike Java and Scala, Python is more dynamic and is not strongly typed. Therefore, a DataSet in Python is a DataSet[Row], but you can call it a DataFrame so that it's consistent with the DataFrame concept of the Pandas framework (https://pandas.pydata.org/).

To close a Python shell, you can type the following:

quit()

Interactive shells aren't the only choice for running code in Spark. It is also possible to implement self-contained applications. Here's an example of reading and manipulating a file in Scala:

import org.apache.spark.sql.SparkSession

object SimpleApp {
def main(args: Array[String]) {
val logFile = "/usr/spark-2.2.1/examples/src/main/resources/people.txt"
val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}

Applications should define a main() method instead of extending scala.App. Note the code to create SparkSession:

val spark = SparkSession.builder.master("local").appName("Simple Application").getOrCreate()

It follows the builder factory design pattern.

Always explicitly close the session before ending the program execution:

spark.stop()

To build the application, you can use a build tool of your choice (Maven, sbt, or Gradle), adding the dependencies from Spark 2.2.1 and Scala 2.11. Once a JAR file has been generated, you can use the $SPARK_HOME/bin/spark-submit command to execute it, specifying the JAR filename, the Spark master URL, and a list of optional parameters, including the job name, the main class, the maximum memory to be used by each executor, and many others.

The same self-contained application could have been implemented in Python as well:

from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder().appName(appName).master(master).getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

This can be saved in a .py file and submitted through the same $SPARK_HOME/bin/spark-submit command for execution.

RDD programming

In general, every Spark application is a driver program that runs the logic that has been implemented for it and executes parallel operations on a cluster. In accordance with the previous definition, the main abstraction provided by the core Spark framework is the RDD. It is an immutable distributed collection of data that is partitioned across machines in a cluster. Operations on RDDs can happen in parallel.

Two types of operations are available on an RDD:

  • Transformations
  • Actions

A transformation is an operation on an RDD that produces another RDD, while an action is an operation that triggers some computation and then returns a value to the master or can be persisted to a storage system. Transformations are lazy—they aren't executed until an action is invoked. Here's the strength point of Spark—Spark masters and their drivers both remember the transformations that have been applied to an RDD, so if a partition is lost (for example, a slave goes down), it can be easily rebuilt on some other node of the cluster.

The following table lists some of the common transformations supported by Spark:

Transformation Purpose
map(func) Returns a new RDD by applying the func function on each data element of the source RDD.
filter(func) Returns a new RDD by selecting those data elements for which the applied func function returns true.
flatMap(func) This transformation is similar to map: the difference is that each input item can be mapped to zero or multiple output items (the applied func function should return a Seq).
union(otherRdd) Returns a new RDD that contains the union of the elements in the source RDD and the otherRdd argument.
distinct([numPartitions]) Returns a new RDD that contains only the distinct elements of the source RDD.
groupByKey([numPartiotions]) When called on an RDD of (K, V) pairs, it returns an RDD of (K, Iterable<V>) pairs. By default, the level of parallelism in the output RDD depends on the number of partitions of the source RDD. You can pass an optional numPartitions argument to set a different number of partitions.
reduceByKey(func, [numPartitions])

When called on an RDD of (K, V) pairs, it returns an RDD of (K, V) pairs, where the values for each key are aggregated using the given reduce func function, which must be of type (V,V) => V. The same as for the groupByKey transformation, the number of reduce partitions is configurable through an optional numPartitions second argument.

sortByKey([ascending], [numPartitions]) When called on an RDD of (K, V) pairs, it returns an RDD of (K, V) pairs sorted by keys in ascending or descending order, as specified in the Boolean ascending argument. The number of partitions for the output RDD is configurable through an optional numPartitions second argument.
join(otherRdd, [numPartitions]) When called on RDDs of type (K, V) and (K, W), it returns an RDD of (K, (V, W)) pairs with all pairs of elements for each key. It supports left outer join, right outer join, and full outer join. The number of partitions for the output RDD is configurable through an optional numPartitions second argument.

The following table lists some of the common actions supported by Spark:

Action Purpose
reduce(func) Aggregates the elements of an RDD using a given function, func (this takes two arguments and returns one). To ensure the correct parallelism at compute time, the reduce function, func, has to be commutative and associative.
collect() Returns all the elements of an RDD as an array to the driver.
count() Returns the total number of elements in an RDD.
first() Returns the first element of an RDD.
take(n) Returns an array containing the first n elements of an RDD.
foreach(func) Executes the func function on each element of an RDD.
saveAsTextFile(path) Writes the elements of an RDD as a text file in a given directory (with the absolute location specified through the path argument) in the local filesystem, HDFS, or any other Hadoop-supported filesystem. This is available for Scala and Java only.
countByKey() This action is only available on RDDs of type (K, V) it returns a hashmap of (K, Int) pairs, where K is a key of the source RDD and its value is the count for that given key, K.

Now, let's understand the concepts of transformation and action through an example that could be executed in the Scala shell—this finds the N most commonly used words in an input text file. The following diagram depicts a potential implementation for this problem:

Figure 1.6

Let's translate this into code.

First of all, let's load the content of a text file into an RDD of strings:

scala> val spiderman = sc.textFile("/usr/spark-2.2.1/tests/spiderman.txt")
spiderman: org.apache.spark.rdd.RDD[String] = /usr/spark-2.2.1/tests/spiderman.txt MapPartitionsRDD[1] at textFile at <console>:24

Then, we will apply the necessary transformations and actions:

scala> val topWordCount = spiderman.flatMap(str=>str.split(" ")).filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case(word, count) => (count, word)}.sortByKey(false)
topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[9] at sortByKey at <console>:26

Here, we have the following:

  • flatMap(str=>str.split(" ")): Splits each line into single words
  • filter(!_.isEmpty): Removes empty strings
  • map(word=>(word,1)): Maps each word into a key-value pair
  • reduceByKey(_+_): Aggregates the count
  • map{case(word, count) => (count, word)}: Reverses the (word, count) pairs to (count, word)
  • sortByKey(false): Sorts by descending order

Finally, print the five most used words in the input content to the console:

scala> topWordCount.take(5).foreach(x=>println(x))
(34,the)
(28,and)
(19,of)
(19,in)
(16,Spider-Man)

The same could be achieved in Python in the following way:

from operator import add
spiderman = spark.read.text("/usr/spark-2.2.1/tests/spiderman.txt")
lines = spiderman.rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add) \
.map(lambda x: (x[1],x[0])) \
.sortByKey(False)

The result, of course, is the same as for the Scala example:

>> counts.take(5)
[(34, 'the'), (28, 'and'), (19, 'in'), (19, 'of'), (16, 'Spider-Man')]

Spark can persist RDDs (and Datasets as well) in memory while executing operations on them. Persisting and caching are synonyms in Spark. When persisting an RDD, each node of the cluster stores the RDD partitions that it needs to compute in memory and reuses them in further actions on the same RDD (or RDDs that have been derived from it through some transformations). This is the reason why future actions execute much faster. It is possible to mark an RDD to be persisted using its persist() method. The first time an action is executed on it, it will be kept in memory on the cluster's nodes. The Spark cache is fault-tolerant—this means that, if for any reason all of the partitions of an RDD are lost, it will be automatically recalculated using the transformations that created it. A persisted RDD can be stored using different storage levels. Levels can be set by passing a StorageLevel object to the persist() method of the RDD. The following table lists all of the available storage levels and their meanings:

Storage Level Purpose
MEMORY_ONLY This is the default storage level. It stores RDDs as deserialized Java objects in memory. In those cases where an RDD shouldn't fit in memory, some of its partitions won't be cached and will be recalculated on the fly when needed.
MEMORY_AND_DISK It stores RDDs as deserialized Java objects in memory first, but, in those cases where an RDD shouldn't fit in memory, it stores some partitions on disk (this is the main difference between MEMORY_ONLY), and reads them from there when needed.
MEMORY_ONLY_SER It stores RDDs as serialized Java objects. Compared to MEMORY_ONLY, this is more space-efficient, but more CPU-intensive in read operations. This is available for JVM languages only.
MEMORY_AND_DISK_SER Is similar to MEMORY_ONLY_SER (it stores RDDs as serialized Java objects), with the main difference being that it stores partitions that don't fit in memory to disk. This is available only for JVM languages.
DISK_ONLY It stores the RDD partitions on disk only.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, and so on The same as the two preceding levels (MEMORY_ONLY and MEMORY_AND_DISK), but each partition is replicated on two cluster nodes.
OFF_HEAP Similar to MEMORY_ONLY_SER, but it stores data in off-heap memory (assuming off-heap memory is enabled). Please be careful when using this storage level as it is still experimental.

When a function is passed to a Spark operation, it is executed on a remote cluster node that will work on separate copies of all the variables that are used in the function. Once done, the variables will be copied to each machine. There will be no updates to the variables on the remote machine when propagated back to the driver program. It would be inefficient to support general, read-write shared variables across tasks.

However, there are two limited types of shared variables that are available in Spark for two common usage patterns broadcast variables and accumulators.

One of the most common operations in Spark programming is to perform joins on RDDs to consolidate data by a given key. In these cases, it is quite possible to have large Datasets sent around to slave nodes that host the partitions to be joined. You can easily understand that this situation presents a huge performance bottleneck, as network I/O is 100 times slower than RAM access. To mitigate this issue, Spark provides broadcast variables, which are broadcast to slave nodes. RDD operations on the nodes can quickly access the broadcast variable value. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication costs. Broadcast variables are created from a variable, v, by calling the SparkContext.broadcast(v) method. The broadcast variable is a wrapper around v, and its value can be obtained by calling the value method. Here's an example in Scala that you can run through the Spark shell:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

After its creation, the broadcast variable, broadcastVar, can be used in any function that's executed on the cluster, but not the initial value, v, as this prevents v being shipped to all the nodes more than once. To ensure that all the nodes get the same value of the broadcast variable, v must not be modified after broadcastVar has been broadcast.

Here's the code for the same example in Python:

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

To aggregate information across executors in a Spark cluster, accumulator variables should be used. The fact that they are added through an associative and commutative operation ensures their efficient support in parallel computation. Spark natively provides support for the accumulators of numeric types—they can be created by calling SparkContext.longAccumulator() (to accumulate values of type Long) or SparkContext.doubleAccumulator() (to accumulate values of type Double) methods.

However, it is possible to programmatically add support for other types. Any task running on a cluster can add to an accumulator using the add method, but they cannot read its value this operation is only allowed for the driver program, which uses its value method. Here's a code example in Scala:

scala> val accum = sc.longAccumulator("First Long Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some
(First Long Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
[Stage 0:> (0 + 0) / 8]


scala> accum.value
res1: Long = 10

In this case, an accumulator has been created, and has assigned a name to it. It is possible to create unnamed accumulators, but a named accumulator will display in the web UI for the stage that modifies that accumulator:

Figure 1.7

This can be helpful for understanding the progress of running stages.

The same example in Python is as follows:

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
>>> accum.value
10

Tracking accumulators in the web UI isn't supported for Python.

Please be aware that Spark guarantees to update accumulators inside actions only. When restarting a task, the accumulators will be updated only once. The same isn't true for transformations.

Spark SQL, Datasets, and DataFrames

Spark SQL is the Spark module for structured data processing. The main difference between this API and the RDD API is that the provided Spark SQL interfaces give more information about the structure of both the data and the performed computation. This extra information is used by Spark internally to add extra optimizations through the Catalyst optimization engine, which is the same execution engine that's used regardless of whatever API or programming language is involved.

Spark SQL is commonly used to execute SQL queries (even if this isn't the only way to use it). Whatever programming language supported by Spark encapsulates the SQL code to be executed, the results of a query are returned as a Dataset. A Dataset is a distributed collection of data, and was added as an interface in Spark 1.6. It combines the benefits of RDDs (such as strong typing and the ability to apply useful lambda functions) with the benefits of Spark SQL's optimized execution engine (Catalyst, https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html). You can construct a Dataset by starting with Java/Scala objects and then manipulating it through the usual functional transformations. The Dataset API is available in Scala and Java, while Python doesn't have support for it. However, due to the dynamic nature of this programming language, many of the benefits of the Dataset API are already available for it.
Starting from Spark 2.0, the DataFrame and Dataset APIs have been merged into the Dataset API, so a DataFrame is just a Dataset that's been organized into named columns and is conceptually equivalent to a table in an RDBMS, but with better optimizations under the hood (being part of the Dataset API, the Catalyst optimization engine works behind the scenes for DataFrames, too). You can construct a DataFrame from diverse sources, such as structured data files, Hive tables, database tables, and RDDs, to name a few. Unlike the Dataset API, the DataFrame API is available in any of the programming languages that are supported by Spark.

Let's start and get hands-on so that we can better understand the concepts behind Spark SQL. The first full example I am going to show is Scala-based. Start a Scala Spark shell to run the following code interactively.

Let's use people.json as a data source. One of the files that's available as a resource for this example has been shipped along with the Spark distribution and can be used to create a DataFrame that's a Dataset of Rows (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row):

val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

You can print the content of the DataFrame to the console to check that it is what you expected:

scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Before you perform DataFrame operations, you need to import the implicit conversions (such as converting RDDs to DataFrames) and use the $ notation:

import spark.implicits._

Now, you can print the DataFrame schema in a tree format:

scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

Select a single column (let's say name):

scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+

Filter the data:

scala> df.filter($"age" > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

Then add a groupBy clause:

scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

Select all rows and increment a numeric field:

scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

It is possible to run SQL queries programmatically through the sql function of SparkSession. This function returns the results of the query in a DataFrame, which, for Scala, is a Dataset[Row]. Let's consider the same DataFrame as for the previous example:

val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

You can register it as an SQL temporary view:

df.createOrReplaceTempView("people")

Then, you can execute an SQL query there:

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

The same things can be done in Python as well:

>>> df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")

Resulting in the following:

>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

>>> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
>>> df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+

>>> df.select(df['name'], df['age'] + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+

>>> df.createOrReplaceTempView("people")
>>> sqlDF = spark.sql("SELECT * FROM people")
>>> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

Other features of Spark SQL and Datasets (data sources, aggregations, self-contained applications, and so on) will be covered in Chapter 3, Extract, Transform, Load.

Spark Streaming

Spark Streaming is another Spark module that extends the core Spark API and provides a scalable, fault-tolerant, and efficient way of processing live streaming data. By converting streaming data into micro batches, Spark's simple batch programming model can be applied in streaming use cases too. This unified programming model makes it easy to combine batch and interactive data processing with streaming. Diverse sources that ingest data are supported (Kafka, Kinesis, TCP sockets, S3, or HDFS, just to mention a few of the popular ones), as well as data coming from them, and can be processed using any of the high-level functions available in Spark. Finally, the processed data can be persisted to RDBMS, NoSQL databases, HDFS, object storage systems, and so on, or consumed through live dashboards. Nothing prevents other advanced Spark components, such as MLlib or GraphX, being applied to data streams:

Figure 1.8

The following diagram shows how Spark Streaming works internally—it receives live input data streams and divides them into batches; these are processed by the Spark engine to generate the final batches of results:

Figure 1.9

The higher-level abstraction of Spark Streaming is the DStream (short for Discretized Stream), which is a wrapper around a continuous flow of data. Internally, a DStream is represented as a sequence of RDDs. A DStream contains a list of other DStreams that it depends on, a function to convert its input RDDs into output ones, and a time interval at which to invoke the function. DStreams are created by either manipulating existing ones, for example, applying a map or filter function (which internally creates MappedDStreams and FilteredDStreams, respectively), or by reading from an external source (the base class in these cases is InputDStream).

Let's implement a simple Scala example—a streaming word count self-contained application. The code used for this class can be found among the examples that are bundled with the Spark distribution. To compile and package it, you need to add the dependency to Spark Streaming to your Maven, Gradle, or sbt project descriptor, along with the dependencies from Spark Core and Scala.

First, we have to create the SparkConf and a StreamingContext (which is the main entry point for any streaming functionality) from it:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(1))

The batch interval has been set to 1 second. A DStream representing streaming data from a TCP source can be created using the ssc streaming context; we need just to specify the source hostname and port, as well as the desired storage level:

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

The returned lines DStream is the stream of data that is going to be received from the server. Each record will be a single line of text that we want to split into single words, thus specifying the space character as a separator:

val words = lines.flatMap(_.split(" "))

Then, we will count those words:

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

The words DStream is mapped (a one-to-one transformation) to a DStream of (word, 1) pairs, which is then reduced to get the frequency of words in each batch of data. The last command will print a few of the counts that are generated every second. Each RDD in a DStream contains data from a certain interval any operation applied on a DStream translates to operations on the underlying RDDs:

Figure 1.10

To start the processing after all the transformations have been set up, use the following code:

ssc.start()
ssc.awaitTermination()

Before running this example, first you will need to run netcat (a small utility found in most Unix-like systems) as a data server:

nc -lk 9999

Then, in a different Terminal, you can start the example by passing the following as arguments:

localhost 9999

Any line that's typed into the Terminal and run with the netcat server will be counted and printed on the application screen every second.

Regardless of whether nc shouldn't be available in the system where you run this example, you can implement your own data server in Scala:

import java.io.DataOutputStream
import java.net.{ServerSocket, Socket}
import java.util.Scanner

object SocketWriter {
def main(args: Array[String]) {
val listener = new ServerSocket(9999)
val socket = listener.accept()

val outputStream = new DataOutputStream(socket.getOutputStream())
System.out.println("Start writing data. Enter close when finish");
val sc = new Scanner(System.in)
var str = ""
/**
* Read content from scanner and write to socket.
*/
while (!(str = sc.nextLine()).equals("close")) {
outputStream.writeUTF(str);
}
//close connection now.
outputStream.close()
listener.close()
}
}

The same self-contained application in Python could be as follows:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

DStreams support most parts of the transformations that are available for RDDs. This means that data from input DStreams can be modified in the same way as the data in RDDs. The following table lists some of the common transformations supported by Spark DStreams:

Transformation Purpose
map(func) Returns a new DStream. The func map function is applied to each element of the source DStream.
flatMap(func) The same as for map. The only difference is that each input item in the new DStream can be mapped to 0 or more output items.
filter(func) Returns a new DStream containing only the elements of the source DStream for which the func filter function returned true.
repartition(numPartitions) This is used to set the level of parallelism by creating a different number of partitions.
union(otherStream) Returns a new DStream. It contains the union of the elements in the source DStream and the input otherDStream DStream.
count() Returns a new DStream. It contains single element RDDs that are obtained by counting the number of elements contained in each RDD arriving from the source.
reduce(func) Returns a new DStream. It contains single element RDDs that are obtained by aggregating those in each RDD of the source by applying the func function (which should be associative and commutative to allow for correct parallel computation).
countByValue() Returns a new DStream of (K, Long) pairs, where K is the type of the elements of the source. The value of each key represents its frequency in each RDD of the source.
reduceByKey(func, [numTasks]) Returns a new DStream of (K, V) pairs (for a source DStream of (K, V) pairs). The values for each key are aggregated by applying the reduce func function. To do the grouping, this transformation uses Spark's default number of parallel tasks (which is two in local mode, while it is determined by the config property spark.default.parallelism in cluster mode), but this can be changed by passing an optional numTasks argument.
join(otherStream, [numTasks]) Returns a new DStream of (K, (V, W)) pairs when called on two DStreams of (K, V) and (K, W) pairs, respectively.
cogroup(otherStream, [numTasks]) Returns a new DStream of (K, Seq[V], Seq[W]) tuples when called on two DStreams of (K, V) and (K, W) pairs, respectively.
transform(func) Returns a new DStream. It applies an RDD-to-RDD func function to every RDD of the source.
updateStateByKey(func) Returns a new state DStream. The state for each key in the new DStream is updated by applying the func input function to the previous state and the new values for the key.

Windowed computations are provided by Spark Streaming. As shown in the following diagram, they allow you to apply transformations over sliding windows of data:

Figure 1.11

When a window slides over a source DStream, all its RDDs that fall within that window are taken into account and transformed to produce the RDDs of the returned windowed DStream. Looking at the specific example that's shown in the preceding diagram, the window-based operation is applied over three time units of data and it slides by two. Two parameters need to be specified by any window operation that's used:

  • Window length: The duration of the window
  • Sliding interval: The interval at which the window operation is performed

These two parameters must be multiples of the batch interval of the source DStream.

Let's see how this could be applied to the application that was presented at the beginning of this section. Suppose you want to generate a word count every 10 seconds over the last 60 seconds of data. The reduceByKey operation needs to be applied on the (word, 1) pairs of the DStream over the last 60 seconds of data. This can be achieved with the reduceByKeyAndWindow operation. When translated into Scala code, this is as follows:

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(60), Seconds(10))

For Python, it is as follows:

windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 60, 10)

The following table lists some of the common window operations supported by Spark for DStreams:

Transformation Purpose
window(windowLength, slideInterval) Returns a new DStream. It is based on windowed batches of the source.
countByWindow(windowLength, slideInterval) Returns a sliding window count (based on the windowLength and slideInterval parameters) of elements in the source DStream.
reduceByWindow(func, windowLength, slideInterval) Returns a new single element DStream. It is created by aggregating elements in the source DStream over a sliding interval by applying the func reduce function (which, to allow for correct parallel computation, is associative and commutative).
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) Returns a new DStream of (K, V) pairs (the same K and V as for the source DStream). The values for each key are aggregated using the func input function over batches (defined by the windowLength and slideInterval arguments) in a sliding window. The number of parallel tasks to do the grouping is two (default) in local mode, while in cluster mode this is given by the Spark configuration property spark.default.parallelism. numTask, which is an optional argument to specify a custom number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) This is a more efficient version of the reduceByKeyAndWindow transformation. This time, the reduce value of the current window is calculated incrementally using the reduce values of the previous one. This happens by reducing the new data that enters a window while inverse reducing the old data that leaves the same one. Please note that this mechanism only works if the func reduce function has a corresponding inverse reduce function, invFunc.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) Returns a DStream of (K, Long) pairs (whatever (K, V) pairs the source DStream is made of). The value of each key in the returned DStream is its frequency within a given sliding window (defined by the windowLength and slideInterval arguments). numTask is an optional argument to specify a custom number of tasks.

Cluster mode using different managers

The following diagram shows how Spark applications run on a cluster. They are independent sets of processes that are coordinated by the SparkContext object in the Driver Program. SparkContext connects to a Cluster Manager, which is responsible for allocating resources across applications. Once the SparkContext is connected, Spark gets executors across cluster nodes.

Executors are processes that execute computations and store data for a given Spark application. SparkContext sends the application code (which could be a JAR file for Scala or .py files for Python) to the executors. Finally, it sends the tasks to run to the executors:

Figure 1.12

To isolate applications from each other, every Spark application receives its own executor processes. They stay alive for the duration of the whole application and run tasks in multithreading mode. The downside to this is that it isn't possible to share data across different Spark applications to share it, data needs to be persisted to an external storage system.

Spark supports different cluster managers, but it is agnostic to the underlying type.

The driver program, at execution time, must be network addressable from the worker nodes because it has to listen for and accept incoming connections from its executors. Because it schedules tasks on the cluster, it should be executed close to the worker nodes, on the same local area network (if possible).

The following are the cluster managers that are currently supported in Spark:

  • Standalone: A simple cluster manager that makes it easy to set up a cluster. It is included with Spark.
  • Apache Mesos: An open source project that's used to manage computer clusters, and was developed at the University of California, Berkeley.
  • Hadoop YARN: The resource manager available in Hadoop starting from release 2.
  • Kubernetes: An open source platform for providing a container-centric infrastructure. Kubernetes support in Spark is still experimental, so it's probably not ready for production yet.

Standalone mode

For standalone mode, you only need to place a compiled version of Spark on each node of the cluster. All the cluster nodes need to be able to resolve the hostnames of the other cluster members and are routable to one another. The Spark master URL can be configured in the $SPARK_HOME/conf/spark-defaults.conf file on all of the nodes:

spark.master                     spark://<master_hostname_or_IP>:7077

Then, the hostname or IP address of the Spark master node needs to be specified in the $SPARK_HOME/conf/spark-env.sh file on all of the nodes, as follows:

SPARK_MASTER_HOST,               <master_hostname_or_IP>

It is now possible to start a standalone master server by executing the following script:

$SPARK_HOME/sbin/start-master.sh

Once the master has completed, a web UI will be available at the http://<master_hostname_or_IP>:8080 URL. From there, it is possible to obtain the master URL that's to be used when starting the workers. One or more workers can now be started by executing the following script:

$SPARK_HOME/sbin/start-slave.sh <master-spark-URL>

Each worker, after the start, comes with its own web UI, whose URL is http://<worker_hostname_or_IP>:8081.

The list of workers, along with other information about their number of CPUs and memory, can be found in the master's web UI.

The way to do this is to run a standalone cluster manually. It is also possible to use the provided launch scripts. A $SPARK_HOME/conf/slaves file needs to be created as a preliminary step. It must contain the hostnames one per line of all of the machines where the Spark workers should start. Passwordless SSH (short for Secure Shell) for the Spark master to the Spark slaves needs to be enabled to allow remote login for the slave daemon startup and shutdown actions. A cluster can then be launched or stopped using the following shell scripts, which are available in the $SPARK_HOME/sbin directory:

  • start-master.sh: Starts a master instance
  • start-slaves.sh: Starts a slave instance on each machine specified in the conf/slaves file
  • start-slave.sh: Starts a single slave instance
  • start-all.sh: Starts both a master and a number of slaves
  • stop-master.sh: Stops a master that has been started via the sbin/start-master.sh script
  • stop-slaves.sh: Stops all slave instances on the nodes specified in the conf/slaves file
  • stop-all.sh: Stops both a master and its slaves

These scripts must be executed on the machine the Spark master will run on.

It is possible to run an interactive Spark shell against a cluster in the following way:

$SPARK_HOME/bin/spark-shell --master <master-spark-URL>

The $SPARK_HOME/bin/spark-submit script can be used to submit a compiled Spark application to the cluster. Spark currently supports two deploy modes for standalone clusters: client and cluster. In client mode, the driver and the client that submits the application are launched in the same process, while in cluster mode, the driver is launched from one of the worker processes and the client process exits as soon as it completes submitting the application (it doesn't have to wait for the application to finish).
When an application is launched through spark-submit, then its JAR file is automatically distributed to all the worker nodes. Any additional JAR that an application depends on should be specified through the jars flag using a comma as a delimiter (for example, jars, jar1, jar2).

As mentioned in the Apache Spark fundamentals section, in standalone mode, the Spark master is a single point of failure. This means that if the Spark master node should go down, the Spark cluster would stop functioning and all currently submitted or running applications would fail, and it wouldn't be possible to submit new applications.

High availability can be configured using Apache ZooKeeper (https://zookeeper.apache.org/), an open source and highly reliable distributed coordination service, or can be deployed as a cluster through Mesos or YARN, which we will talk about in the following two sections.

Mesos cluster mode

Spark can run on clusters that are managed by Apache Mesos (http://mesos.apache.org/). Mesos is a cross-platform, cloud provider-agnostic, centralized, and fault-tolerant cluster manager, designed for distributed computing environments. Among its main features, it provides resource management and isolation, and the scheduling of CPU and memory across the cluster. It can join multiple physical resources into a single virtual one, and in doing so is different from classic virtualization, where a single physical resource is split into multiple virtual resources. With Mesos, it is possible to build or schedule cluster frameworks such as Apache Spark (though it is not restricted to just this). The following diagram shows the Mesos architecture:

Figure 1.13

Mesos consists of a master daemon and frameworks. The master daemon manages agent daemons running on each cluster node, while the Mesos frameworks run tasks on the agents. The master empowers fine-grained sharing of resources (including CPU and RAM) across frameworks by making them resource offers. It decides how much of the available resources to offer to each framework, depending on given organizational policies. To support diverse sets of policies, the master uses a modular architecture that makes it easy to add new allocation modules through a plugin mechanism. A Mesos framework consists of two components a scheduler, which registers itself with the master to be offered resources, and an executor, a process that is launched on agent nodes to execute the framework's tasks. While it is the master that determines how many resources are offered to each framework, the frameworks' schedulers are responsible for selecting which of the offered resources to use. The moment a framework accepts offered resources, it passes a description of the tasks it wants to execute on them to Mesos. Mesos, in turn, launches the tasks on the corresponding agents.

The advantages of deploying a Spark cluster using Mesos to replace the Spark Master Manager include the following:

  • Dynamic partitioning between Spark and other frameworks
  • Scalable partitioning between multiple instances of Spark

Spark 2.2.1 is designed to be used with Mesos 1.0.0+. In this section, I won't describe the steps to deploy a Mesos cluster I am assuming that a Mesos cluster is already available and running. No particular procedure or patch is required in terms of Mesos installation to run Spark on it. To verify that the Mesos cluster is ready for Spark, navigate to the Mesos master web UI at port 5050:

Figure 1.14

Check that all of the expected machines are present in the Agents tab.

To use Mesos from Spark, a Spark binary package needs to be available in a place that's accessible by Mesos itself, and a Spark driver program needs to be configured to connect to Mesos. Alternatively, it is possible to install Spark in the same location across all the Mesos slaves and then configure the spark.mesos.executor.home property (the default value is $SPARK_HOME) to point to that location.

The Mesos master URLs have the form mesos://host:5050 for a single-master Mesos cluster, or mesos://zk://host1:2181,host2:2181,host3:2181/mesos for a multi-master Mesos cluster when using Zookeeper.

The following is an example of how to start a Spark shell on a Mesos cluster:

$SPARK_HOME/bin/spark-shell --master mesos://127.0.0.1:5050 -c spark.mesos.executor.home=`pwd`

A Spark application can be submitted to a Mesos managed Spark cluster as follows:

$SPARK_HOME/bin/spark-submit --master mesos://127.0.0.1:5050 --total-executor-cores 2 --executor-memory 3G  $SPARK_HOME/examples/src/main/python/pi.py 100

YARN cluster mode

YARN (http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YARN.html), which was introduced in Apache Hadoop 2.0, brought significant improvements in terms of scalability, high availability, and support for different paradigms. In the Hadoop version 1 MapReduce framework, job execution was controlled by types of processes—a single master process called JobTracker coordinates all the jobs running on the cluster and assigns map and reduce tasks to run on the TaskTrackers, which are a number of subordinate processes running assigned tasks and periodically reporting the progress to the JobTracker. Having a single JobTracker was a scalability bottleneck. The maximum cluster size was a little more than 4,000 nodes, with the number of concurrent tasks limited to 40,000. Furthermore, the JobTracker was a single point of failure and the only available programming model was MapReduce.

The fundamental idea of YARN is to split up the functionalities of resource management and job scheduling or monitoring into separate daemons. The idea is to have a global ResourceManager and per-application ApplicationMaster (App Mstr). An application is either a single job or a DAG of jobs. The following is a diagram of YARN's architecture:

Figure 1.15

The ResourceManager and the NodeManager form the YARN framework. The ResourceManager decides on resource usage across all the running applications, while the NodeManager is an agent running on any machine in the cluster and is responsible for the containers by monitoring their resource usage (including CPU and memory) and reporting to the ResourceManager. The ResourceManager consists of two components the scheduler and the ApplicationsManager. The scheduler is the component that's responsible for allocating resources to the various applications running, and it doesn't perform any monitoring of applications' statuses, nor offer guarantees about restarting any failed tasks. It performs scheduling based on an application's resource requirements.

The ApplicationsManager accepts job submissions and provides a service to restart the App Mstr container on any failure. The per-application App Mstr is responsible for negotiating the appropriate resource containers from the scheduler and monitoring their status and progress. YARN, by its nature, is a general scheduler, so support for non-MapReduce jobs (such as Spark jobs) is available for Hadoop clusters.

Submitting Spark applications on YARN

To launch Spark applications on YARN, the HADOOP_CONF_DIR or YARN_CONF_DIR env variable needs to be set and pointing to the directory that contains the client-side configuration files for the Hadoop cluster. These configurations are needed to connect to the YARN ResourceManager and to write to HDFS. This configuration is distributed to the YARN cluster so that all the containers used by the Spark application have the same configuration. To launch Spark applications on YARN, two deployment modes are available:

  • Cluster mode: In this case, the Spark driver runs inside an application master process that's managed by YARN on the cluster. The client can finish its execution after initiating the application.
  • Client mode: In this case, the driver runs and the client runs in the same process. The application master is used for the sole purpose of requesting resources from YARN.

Unlike the other modes, in which the master's address is specified in the master parameter, in YARN mode, the ResourceManager's address is retrieved from the Hadoop configuration. Therefore, the master parameter value is always yarn.

You can use the following command to launch a Spark application in cluster mode:

$SPARK_HOME/bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

In cluster mode, since the driver runs on a different machine than the client, the SparkContext.addJar method doesn't work with the files that are local to the client. The only choice is to include them using the jars option in the launch command.

Launching a Spark application in client mode happens the same way—the deploy-mode option value needs to change from cluster to client.

Kubernetes cluster mode

Kubernetes (https://kubernetes.io/) is an open source system that's used automate the deployment, scaling, and management of containerized applications. It was originally implemented at Google and then open sourced in 2014. The following are the main concepts of Kubernetes:

  • Pod: This is the smallest deployable unit of computing that can be created and managed. A pod can be seen as a group of one or more containers that share network and storage space, which also contains a specification for how to run those containers.
  • Deployment: This is a layer of abstraction whose primary purpose is to declare how many replicas of a pod should be running at a time.
  • Ingress: This is an open channel for communication with a service running in a pod.
  • Node: This is a representation of a single machine in a cluster.
  • Persistent volume: This provides a filesystem that can be mounted to a cluster, not to be associated with any particular node. This is the way Kubernetes persists information (data, files, and so on).

The following diagram (source: https://d33wubrfki0l68.cloudfront.net/518e18713c865fe67a5f23fc64260806d72b38f5/61d75/images/docs/post-ccm-arch.png) shows the Kubernetes architecture:

Figure 1.16

The main components of the Kubernetes architecture are as follows:

  • Cloud controller manager: It runs the Kubernetes controllers
  • Controllers: There are four of them—node, route, service, and PersistenceVolumeLabels
  • Kubelets: The primary agents that run on nodes

The submission of Spark jobs to a Kubernetes cluster can be done directly through spark-submit. Kubernetes requires that we supply Docker (https://www.docker.com/) images that can be deployed into containers within pods. Starting from the 2.3 release, Spark provides a Dockerfile ($SPARK_HOME/kubernetes/dockerfiles/Dockerfile, which can also be customized to match specific applications' needs) and a script ($SPARK_HOME/bin/docker-image-tool.sh) that can be used to build and publish Docker images that are to be used within a Kubernetes backend. The following is the syntax that's used to build a Docker image through the provided script:

$SPARK_HOME/bin/docker-image-tool.sh -r <repo> -t my-tag build

This following is the syntax to push an image to a Docker repository while using the same script:

$SPARK_HOME/bin/docker-image-tool.sh -r <repo> -t my-tag push

A job can be submitted in the following way:

$SPARK_HOME/bin/spark-submit \
--master k8s://https://<k8s_hostname>:<k8s_port> \
--deploy-mode cluster \
--name <application-name> \
--class <package>.<ClassName> \
--conf spark.executor.instances=<instance_count> \
--conf spark.kubernetes.container.image=<spark-image> \
local:///path/to/<sparkjob>.jar

Kubernetes requires application names to contain only lowercase alphanumeric characters, hyphens, and dots, and to start and end with an alphanumeric character.

The following diagram shows the way the submission mechanism works:

Figure 1.17

Here's what happens:

  • Spark creates a driver that's running within a Kubernetes pod
  • The driver creates the executors, which also run within Kubernetes pods, and then connects to them and executes application code
  • At the end of the execution, the executor pods terminate and are cleaned up, while the driver pod still persists logs and remains in a completed state (which means that it doesn't use cluster computation or memory resources) in the Kubernetes API (until it's eventually garbage collected or manually deleted)

Summary

In this chapter, we became familiar with Apache Spark and most of its main modules. We started to use the available Spark shells and wrote our first self-contained application using the Scala and Python programming languages. Finally, we explored different ways of deploying and running Spark in cluster mode. Everything we have learned about so far is necessary for understanding the topics that are presented from Chapter 3, Extract, Transform, Load, onward. If you have any doubts about any of the presented topics, I suggest that you go back and read this chapter again before moving on.

In the next chapter, we are going to explore the basics of DL, with an emphasis on some particular implementations of multi-layer neural networks.

Left arrow icon Right arrow icon
Download code icon Download Code

Key benefits

  • Explore the world of distributed deep learning with Apache Spark
  • Train neural networks with deep learning libraries such as BigDL and TensorFlow
  • Develop Spark deep learning applications to intelligently handle large and complex datasets

Description

Deep learning is a subset of machine learning where datasets with several layers of complexity can be processed. Hands-On Deep Learning with Apache Spark addresses the sheer complexity of technical and analytical parts and the speed at which deep learning solutions can be implemented on Apache Spark. The book starts with the fundamentals of Apache Spark and deep learning. You will set up Spark for deep learning, learn principles of distributed modeling, and understand different types of neural nets. You will then implement deep learning models, such as convolutional neural networks (CNNs), recurrent neural networks (RNNs), and long short-term memory (LSTM) on Spark. As you progress through the book, you will gain hands-on experience of what it takes to understand the complex datasets you are dealing with. During the course of this book, you will use popular deep learning frameworks, such as TensorFlow, Deeplearning4j, and Keras to train your distributed models. By the end of this book, you'll have gained experience with the implementation of your models on a variety of use cases.

Who is this book for?

If you are a Scala developer, data scientist, or data analyst who wants to learn how to use Spark for implementing efficient deep learning models, Hands-On Deep Learning with Apache Spark is for you. Knowledge of the core machine learning concepts and some exposure to Spark will be helpful.

What you will learn

  • Understand the basics of deep learning
  • Set up Apache Spark for deep learning
  • Understand the principles of distribution modeling and different types of neural networks
  • Obtain an understanding of deep learning algorithms
  • Discover textual analysis and deep learning with Spark
  • Use popular deep learning frameworks, such as Deeplearning4j, TensorFlow, and Keras
  • Explore popular deep learning algorithms
Estimated delivery fee Deliver to United States

Economy delivery 10 - 13 business days

Free $6.95

Premium delivery 6 - 9 business days

$21.95
(Includes tracking information)

Product Details

Country selected
Publication date, Length, Edition, Language, ISBN-13
Publication date : Jan 31, 2019
Length: 322 pages
Edition : 1st
Language : English
ISBN-13 : 9781788994613
Category :
Languages :
Concepts :

What do you get with Print?

Product feature icon Instant access to your digital eBook copy whilst your Print order is Shipped
Product feature icon Paperback book shipped to your preferred address
Product feature icon Download this book in EPUB and PDF formats
Product feature icon Access this title in our online reader with advanced features
Product feature icon DRM FREE - Read whenever, wherever and however you want
Product feature icon AI Assistant (beta) to help accelerate your learning
Estimated delivery fee Deliver to United States

Economy delivery 10 - 13 business days

Free $6.95

Premium delivery 6 - 9 business days

$21.95
(Includes tracking information)

Product Details

Publication date : Jan 31, 2019
Length: 322 pages
Edition : 1st
Language : English
ISBN-13 : 9781788994613
Category :
Languages :
Concepts :

Packt Subscriptions

See our plans and pricing
Modal Close icon
$19.99 billed monthly
Feature tick icon Unlimited access to Packt's library of 7,000+ practical books and videos
Feature tick icon Constantly refreshed with 50+ new titles a month
Feature tick icon Exclusive Early access to books as they're written
Feature tick icon Solve problems while you work with advanced search and reference features
Feature tick icon Offline reading on the mobile app
Feature tick icon Simple pricing, no contract
$199.99 billed annually
Feature tick icon Unlimited access to Packt's library of 7,000+ practical books and videos
Feature tick icon Constantly refreshed with 50+ new titles a month
Feature tick icon Exclusive Early access to books as they're written
Feature tick icon Solve problems while you work with advanced search and reference features
Feature tick icon Offline reading on the mobile app
Feature tick icon Choose a DRM-free eBook or Video every month to keep
Feature tick icon PLUS own as many other DRM-free eBooks or Videos as you like for just $5 each
Feature tick icon Exclusive print discounts
$279.99 billed in 18 months
Feature tick icon Unlimited access to Packt's library of 7,000+ practical books and videos
Feature tick icon Constantly refreshed with 50+ new titles a month
Feature tick icon Exclusive Early access to books as they're written
Feature tick icon Solve problems while you work with advanced search and reference features
Feature tick icon Offline reading on the mobile app
Feature tick icon Choose a DRM-free eBook or Video every month to keep
Feature tick icon PLUS own as many other DRM-free eBooks or Videos as you like for just $5 each
Feature tick icon Exclusive print discounts

Frequently bought together


Stars icon
Total $ 136.97
Machine Learning with Apache Spark Quick Start Guide
$32.99
Apache Spark Deep Learning Cookbook
$54.99
Hands-On Deep Learning with Apache Spark
$48.99
Total $ 136.97 Stars icon

Table of Contents

16 Chapters
The Apache Spark Ecosystem Chevron down icon Chevron up icon
Deep Learning Basics Chevron down icon Chevron up icon
Extract, Transform, Load Chevron down icon Chevron up icon
Streaming Chevron down icon Chevron up icon
Convolutional Neural Networks Chevron down icon Chevron up icon
Recurrent Neural Networks Chevron down icon Chevron up icon
Training Neural Networks with Spark Chevron down icon Chevron up icon
Monitoring and Debugging Neural Network Training Chevron down icon Chevron up icon
Interpreting Neural Network Output Chevron down icon Chevron up icon
Deploying on a Distributed System Chevron down icon Chevron up icon
NLP Basics Chevron down icon Chevron up icon
Textual Analysis and Deep Learning Chevron down icon Chevron up icon
Convolution Chevron down icon Chevron up icon
Image Classification Chevron down icon Chevron up icon
What's Next for Deep Learning? Chevron down icon Chevron up icon
Other Books You May Enjoy Chevron down icon Chevron up icon
Get free access to Packt library with over 7500+ books and video courses for 7 days!
Start Free Trial

FAQs

What is the delivery time and cost of print book? Chevron down icon Chevron up icon

Shipping Details

USA:

'

Economy: Delivery to most addresses in the US within 10-15 business days

Premium: Trackable Delivery to most addresses in the US within 3-8 business days

UK:

Economy: Delivery to most addresses in the U.K. within 7-9 business days.
Shipments are not trackable

Premium: Trackable delivery to most addresses in the U.K. within 3-4 business days!
Add one extra business day for deliveries to Northern Ireland and Scottish Highlands and islands

EU:

Premium: Trackable delivery to most EU destinations within 4-9 business days.

Australia:

Economy: Can deliver to P. O. Boxes and private residences.
Trackable service with delivery to addresses in Australia only.
Delivery time ranges from 7-9 business days for VIC and 8-10 business days for Interstate metro
Delivery time is up to 15 business days for remote areas of WA, NT & QLD.

Premium: Delivery to addresses in Australia only
Trackable delivery to most P. O. Boxes and private residences in Australia within 4-5 days based on the distance to a destination following dispatch.

India:

Premium: Delivery to most Indian addresses within 5-6 business days

Rest of the World:

Premium: Countries in the American continent: Trackable delivery to most countries within 4-7 business days

Asia:

Premium: Delivery to most Asian addresses within 5-9 business days

Disclaimer:
All orders received before 5 PM U.K time would start printing from the next business day. So the estimated delivery times start from the next day as well. Orders received after 5 PM U.K time (in our internal systems) on a business day or anytime on the weekend will begin printing the second to next business day. For example, an order placed at 11 AM today will begin printing tomorrow, whereas an order placed at 9 PM tonight will begin printing the day after tomorrow.


Unfortunately, due to several restrictions, we are unable to ship to the following countries:

  1. Afghanistan
  2. American Samoa
  3. Belarus
  4. Brunei Darussalam
  5. Central African Republic
  6. The Democratic Republic of Congo
  7. Eritrea
  8. Guinea-bissau
  9. Iran
  10. Lebanon
  11. Libiya Arab Jamahriya
  12. Somalia
  13. Sudan
  14. Russian Federation
  15. Syrian Arab Republic
  16. Ukraine
  17. Venezuela
What is custom duty/charge? Chevron down icon Chevron up icon

Customs duty are charges levied on goods when they cross international borders. It is a tax that is imposed on imported goods. These duties are charged by special authorities and bodies created by local governments and are meant to protect local industries, economies, and businesses.

Do I have to pay customs charges for the print book order? Chevron down icon Chevron up icon

The orders shipped to the countries that are listed under EU27 will not bear custom charges. They are paid by Packt as part of the order.

List of EU27 countries: www.gov.uk/eu-eea:

A custom duty or localized taxes may be applicable on the shipment and would be charged by the recipient country outside of the EU27 which should be paid by the customer and these duties are not included in the shipping charges been charged on the order.

How do I know my custom duty charges? Chevron down icon Chevron up icon

The amount of duty payable varies greatly depending on the imported goods, the country of origin and several other factors like the total invoice amount or dimensions like weight, and other such criteria applicable in your country.

For example:

  • If you live in Mexico, and the declared value of your ordered items is over $ 50, for you to receive a package, you will have to pay additional import tax of 19% which will be $ 9.50 to the courier service.
  • Whereas if you live in Turkey, and the declared value of your ordered items is over € 22, for you to receive a package, you will have to pay additional import tax of 18% which will be € 3.96 to the courier service.
How can I cancel my order? Chevron down icon Chevron up icon

Cancellation Policy for Published Printed Books:

You can cancel any order within 1 hour of placing the order. Simply contact customercare@packt.com with your order details or payment transaction id. If your order has already started the shipment process, we will do our best to stop it. However, if it is already on the way to you then when you receive it, you can contact us at customercare@packt.com using the returns and refund process.

Please understand that Packt Publishing cannot provide refunds or cancel any order except for the cases described in our Return Policy (i.e. Packt Publishing agrees to replace your printed book because it arrives damaged or material defect in book), Packt Publishing will not accept returns.

What is your returns and refunds policy? Chevron down icon Chevron up icon

Return Policy:

We want you to be happy with your purchase from Packtpub.com. We will not hassle you with returning print books to us. If the print book you receive from us is incorrect, damaged, doesn't work or is unacceptably late, please contact Customer Relations Team on customercare@packt.com with the order number and issue details as explained below:

  1. If you ordered (eBook, Video or Print Book) incorrectly or accidentally, please contact Customer Relations Team on customercare@packt.com within one hour of placing the order and we will replace/refund you the item cost.
  2. Sadly, if your eBook or Video file is faulty or a fault occurs during the eBook or Video being made available to you, i.e. during download then you should contact Customer Relations Team within 14 days of purchase on customercare@packt.com who will be able to resolve this issue for you.
  3. You will have a choice of replacement or refund of the problem items.(damaged, defective or incorrect)
  4. Once Customer Care Team confirms that you will be refunded, you should receive the refund within 10 to 12 working days.
  5. If you are only requesting a refund of one book from a multiple order, then we will refund you the appropriate single item.
  6. Where the items were shipped under a free shipping offer, there will be no shipping costs to refund.

On the off chance your printed book arrives damaged, with book material defect, contact our Customer Relation Team on customercare@packt.com within 14 days of receipt of the book with appropriate evidence of damage and we will work with you to secure a replacement copy, if necessary. Please note that each printed book you order from us is individually made by Packt's professional book-printing partner which is on a print-on-demand basis.

What tax is charged? Chevron down icon Chevron up icon

Currently, no tax is charged on the purchase of any print book (subject to change based on the laws and regulations). A localized VAT fee is charged only to our European and UK customers on eBooks, Video and subscriptions that they buy. GST is charged to Indian customers for eBooks and video purchases.

What payment methods can I use? Chevron down icon Chevron up icon

You can pay with the following card types:

  1. Visa Debit
  2. Visa Credit
  3. MasterCard
  4. PayPal
What is the delivery time and cost of print books? Chevron down icon Chevron up icon

Shipping Details

USA:

'

Economy: Delivery to most addresses in the US within 10-15 business days

Premium: Trackable Delivery to most addresses in the US within 3-8 business days

UK:

Economy: Delivery to most addresses in the U.K. within 7-9 business days.
Shipments are not trackable

Premium: Trackable delivery to most addresses in the U.K. within 3-4 business days!
Add one extra business day for deliveries to Northern Ireland and Scottish Highlands and islands

EU:

Premium: Trackable delivery to most EU destinations within 4-9 business days.

Australia:

Economy: Can deliver to P. O. Boxes and private residences.
Trackable service with delivery to addresses in Australia only.
Delivery time ranges from 7-9 business days for VIC and 8-10 business days for Interstate metro
Delivery time is up to 15 business days for remote areas of WA, NT & QLD.

Premium: Delivery to addresses in Australia only
Trackable delivery to most P. O. Boxes and private residences in Australia within 4-5 days based on the distance to a destination following dispatch.

India:

Premium: Delivery to most Indian addresses within 5-6 business days

Rest of the World:

Premium: Countries in the American continent: Trackable delivery to most countries within 4-7 business days

Asia:

Premium: Delivery to most Asian addresses within 5-9 business days

Disclaimer:
All orders received before 5 PM U.K time would start printing from the next business day. So the estimated delivery times start from the next day as well. Orders received after 5 PM U.K time (in our internal systems) on a business day or anytime on the weekend will begin printing the second to next business day. For example, an order placed at 11 AM today will begin printing tomorrow, whereas an order placed at 9 PM tonight will begin printing the day after tomorrow.


Unfortunately, due to several restrictions, we are unable to ship to the following countries:

  1. Afghanistan
  2. American Samoa
  3. Belarus
  4. Brunei Darussalam
  5. Central African Republic
  6. The Democratic Republic of Congo
  7. Eritrea
  8. Guinea-bissau
  9. Iran
  10. Lebanon
  11. Libiya Arab Jamahriya
  12. Somalia
  13. Sudan
  14. Russian Federation
  15. Syrian Arab Republic
  16. Ukraine
  17. Venezuela