In this section, we will explore key concepts related to Resilient Distributed Datasets (RDD), DataFrames, and Datasets, Catalyst Optimizer and Project Tungsten.
Understanding Spark SQL concepts
Understanding Resilient Distributed Datasets (RDDs)
RDDs are Spark's primary distributed Dataset abstraction. It is a collection of data that is immutable, distributed, lazily evaluated, type inferred, and cacheable. Prior to execution, the developer code (using higher-level constructs such as SQL, DataFrames, and Dataset APIs) is converted to a DAG of RDDs (ready for execution).
You can create RDDs by parallelizing an existing collection of data or accessing a Dataset residing in an external storage system, such as the file system or various Hadoop-based data sources. The parallelized collections form a distributed Dataset that enable parallel operations on them.
You can create a RDD from the input file with number of partitions specified, as shown:
scala> val cancerRDD = sc.textFile("file:///Users/aurobindosarkar/Downloads/", 4)
scala> cancerRDD.partitions.size
res37: Int = 4
You can implicitly convert the RDD to a DataFrame by importing the spark.implicits package and using the toDF() method:
scala> import spark.implicits._scala>
val cancerDF = cancerRDD.toDF()
To create a DataFrame with a specific schema, we define a Row object for the rows contained in the DataFrame. Additionally, we split the comma-separated data, convert it to a list of fields, and then map it to the Row object. Finally, we use the createDataFrame() to create the DataFrame with a specified schema:
def row(line: List[String]): Row = { Row(line(0).toLong, line(1).toInt, line(2).toInt, line(3).toInt, line(4).toInt, line(5).toInt, line(6).toInt, line(7).toInt, line(8).toInt, line(9).toInt, line(10).toInt) }
val data =",").to[List]).map(row)
val cancerDF = spark.createDataFrame(data, recordSchema)
Further, we can easily convert the preceding DataFrame to a Dataset using the case class defined earlier:
scala> val cancerDS =[CancerClass]
RDD data is logically divided into a set of partitions; additionally, all input, intermediate, and output data is also represented as partitions. The number of RDD partitions defines the level of data fragmentation. These partitions are also the basic units of parallelism. Spark execution jobs are split into multiple stages, and as each stage operates on one partition at a time, it is very important to tune the number of partitions. Fewer partitions than active stages means your cluster could be under-utilized, while an excessive number of partitions could impact the performance due to higher disk and network I/O.
The programming interface to RDDs support two types of operations: transformations and actions. The transformations create a new Dataset from an existing one, while the actions return a value or result of a computation. All transformations are evaluated lazily--the actual execution occurs only when an action is executed to compute a result. The transformations form a lineage graph instead of actually replicating data across multiple machines. This graph-based approach enables an efficient fault tolerance model. For example, if an RDD partition is lost, then it can be recomputed based on the lineage graph.Â
You can control data persistence (for example, caching) and specify placement preferences for RDD partitions and then use specific operators for manipulating them. By default, Spark persists RDDs in memory, but it can spill them to disk if sufficient RAM isn't available. Caching improves performance by several orders of magnitude; however, it is often memory intensive. Other persistence options include storing RDDs to disk and replicating them across the nodes in your cluster. The in-memory storage of persistent RDDs can be in the form of deserialized or serialized Java objects. The deserialized option is faster, while the serialized option is more memory-efficient (but slower). Unused RDDs are automatically removed from the cache but, depending on your requirements; if a specific RDD is no longer required, then you can also explicitly release it.
Understanding DataFrames and Datasets
A DataFrame is similar to a table in a relational database, a pandas dataframe, or a data frame in R. It is a distributed collection of rows that is organized into columns. It uses the immutable, in-memory, resilient, distributed, and parallel capabilities of RDD, and applies a schema to the data. DataFrames are also evaluated lazily. Additionally, they provide a domain-specific language (DSL) for distributed data manipulation.
Conceptually, the DataFrame is an alias for a collection of generic objects Dataset[Row], where a row is a generic untyped object. This means that syntax errors for DataFrames are caught during the compile stage; however, analysis errors are detected only during runtime.
DataFrames can be constructed from a wide array of sources, such as structured data files, Hive tables, databases, or RDDs. The source data can be read from local filesystems, HDFS, Amazon S3, and RDBMSs. In addition, other popular data formats, such as CSV, JSON, Avro, Parquet, and so on, are also supported. Additionally, you can also create and use custom data sources.
The DataFrame API supports Scala, Java, Python, and R programming APIs. The DataFrames API is declarative, and combined with procedural Spark code, it provides a much tighter integration between the relational and procedural processing in your applications. DataFrames can be manipulated using Spark's procedural API, or using relational APIs (with richer optimizations).
In the early versions of Spark, you had to write arbitrary Java, Python, or Scala functions that operated on RDDs. In this scenario, the functions were executing on opaque Java objects. Hence, the user functions were essentially black boxes executing opaque computations using opaque objects and data types. This approach was very general and such programs had complete control over the execution of every data operation. However, as the engine did not know the code you were executing or the nature of the data, it was not possible to optimize these arbitrary Java objects. In addition, it was incumbent on the developers to write efficient programs that were dependent on the nature of their specific workloads.
In Spark 2.0, the main benefit of using SQL, DataFrames, and Datasets is that it's easier to program using these high-level programming interfaces while reaping the benefits of improved performance, automatically. You have to write significantly fewer lines of code and the programs are automatically optimized and efficient code is generated for you. This results in better performance while significantly reducing the burden on developers. Now, the developer can focus on the "what" rather than the "how" of something that needs to be accomplished.
The Dataset API was first added to Spark 1.6 to provide the benefits of both RDDs and the Spark SQL's optimizer. A Dataset can be constructed from JVM objects and then manipulated using functional transformations such as map, filter, and so on. As the Dataset is a collection of strongly-typed objects specified using a user-defined case class, both syntax errors and analysis errors can be detected at compile time.
The unified Dataset API can be used in both Scala and Java. However, Python does not support the Dataset API yet.
In the following example, we present a few basic DataFrame/Dataset operations. For this purpose, we will use two restaurant listing datasets that are typically used in duplicate records detection and record linkage applications. The two lists, one each from Zagat's and Fodor's restaurant guides, have duplicate records between them. To keep this example simple, we have manually converted the input files to a CSV format. You can download the original dataset from
First, we define a case class for the records in the two files:
scala> case class RestClass(name: String, street: String, city: String, phone: String, cuisine: String)
Next, we create Datasets from the two files:
scala> val rest1DS = spark.sparkContext.textFile("file:///Users/aurobindosarkar/Documents/SparkBook/data/zagats.csv").map(_.split(",")).map(attributes => RestClass(attributes(0).trim, attributes(1).trim, attributes(2).trim, attributes(3).trim, attributes(4).trim)).toDS()
scala> val rest2DS = spark.sparkContext.textFile("file:///Users/aurobindosarkar/Documents/SparkBook/data/fodors.csv").map(_.split(",")).map(attributes => RestClass(attributes(0).trim, attributes(1).trim, attributes(2).trim, attributes(3).trim, attributes(4).trim)).toDS()
We define a UDF to clean up and transform the phone numbers in the second Dataset to match the format in the first file:
scala> def formatPhoneNo(s: String): String = s match {case s if s.contains("/") => s.replaceAll("/", "-").replaceAll("- ", "-").replaceAll("--", "-") case _ => s }
scala> val udfStandardizePhoneNos = udf[String, String]( x => formatPhoneNo(x) )
scala> val rest2DSM1 = rest2DS.withColumn("stdphone", udfStandardizePhoneNos(rest2DS.col("phone")))
Next, we create temporary views from our Datasets:
scala> rest1DS.createOrReplaceTempView("rest1Table")
scala> rest2DSM1.createOrReplaceTempView("rest2Table")
We can get a count of the number of duplicates, by executing a SQL statement on these tables that returns the count of the number of records with matching phone numbers:
scala> spark.sql("SELECT count(*) from rest1Table, rest2Table where = rest2Table.stdphone").show()
Next, we execute a SQL statement that returns a DataFrame containing the rows with matching phone numbers:
scala> val sqlDF = spark.sql("SELECT,,, b.stdphone from rest1Table a, rest2Table b where = b.stdphone")
The results listing the name and the phone number columns from the two tables can be displayed to visually verify, if the results are possible duplicates:
In the next section, we will shift our focus to Spark SQL internals, more specifically, to the Catalyst optimizer and Project Tungsten.
Understanding the Catalyst optimizer
The Catalyst optimizer is at the core of Spark SQL and is implemented in Scala. It enables several key features, such as schema inference (from JSON data), that are very useful in data analysis work.
The following figure shows the high-level transformation process from a developer's program containing DataFrames/Datasets to the final execution plan:
The internal representation of the program is a query plan. The query plan describes data operations such as aggregate, join, and filter, which match what is defined in your query. These operations generate a new Dataset from the input Dataset. After we have an initial version of the query plan ready, the Catalyst optimizer will apply a series of transformations to convert it to an optimized query plan. Finally, the Spark SQL code generation mechanism translates the optimized query plan into a DAG of RDDs that is ready for execution. The query plans and the optimized query plans are internally represented as trees. So, at its core, the Catalyst optimizer contains a general library for representing trees and applying rules to manipulate them. On top of this library, are several other libraries that are more specific to relational query processing.
Catalyst has two types of query plans: Logical and Physical Plans. The Logical Plan describes the computations on the Datasets without defining how to carry out the specific computations. Typically, the Logical Plan generates a list of attributes or columns as output under a set of constraints on the generated rows. The Physical Plan describes the computations on Datasets with specific definitions on how to execute them (it is executable).
Let's explore the transformation steps in more detail. The initial query plan is essentially an unresolved Logical Plan, that is, we don't know the source of the Datasets or the columns (contained in the Dataset) at this stage and we also don't know the types of columns. The first step in this pipeline is the analysis step. During analysis, the catalog information is used to convert the unresolved Logical Plan to a resolved Logical Plan.
In the next step, a set of logical optimization rules is applied to the resolved Logical Plan, resulting in an optimized Logical Plan. In the next step the optimizer may generate multiple Physical Plans and compare their costs to pick the best one. The first version of the Cost-based Optimizer (CBO), built on top of Spark SQL has been released in Spark 2.2. More details on cost-based optimization are presented in Chapter 11, Tuning Spark SQL Components for Performance.
All three--DataFrame, Dataset and SQL--share the same optimization pipeline as illustrated in the following figure:
Understanding Catalyst optimizations
In Catalyst, there are two main types of optimizations: Logical and Physical:
Logical Optimizations: This includes the ability of the optimizer to push filter predicates down to the data source and enable execution to skip irrelevant data. For example, in the case of Parquet files, entire blocks can be skipped and comparisons on strings can be turned into cheaper integer comparisons via dictionary encoding. In the case of RDBMSs, the predicates are pushed down to the database to reduce the amount of data traffic.
Physical Optimizations: This includes the ability to choose intelligently between broadcast joins and shuffle joins to reduce network traffic, performing lower-level optimizations, such as eliminating expensive object allocations and reducing virtual function calls. Hence, and performance typically improves when DataFrames are introduced in your programs.
The Rule Executor is responsible for the analysis and logical optimization steps, while a set of strategies and the Rule Executor are responsible for the physical planning step. The Rule Executor transforms a tree to another of the same type by applying a set of rules in batches. These rules can be applied one or more times. Also, each of these rules is implemented as a transform. A transform is basically a function, associated with every tree, and is used to implement a single rule. In Scala terms, the transformation is defined as a partial function (a function defined for a subset of its possible arguments). These are typically defined as case statements to determine whether the partial function (using pattern matching) is defined for the given input.
The Rule Executor makes the Physical Plan ready for execution by preparing scalar subqueries, ensuring that the input rows meet the requirements of the specific operation and applying the physical optimizations. For example, in the sort merge join operations, the input rows need to be sorted as per the join condition. The optimizer inserts the appropriate sort operations, as required, on the input rows before the sort merge join operation is executed.
Understanding Catalyst transformations
Conceptually, the Catalyst optimizer executes two types of transformations. The first one converts an input tree type to the same tree type (that is, without changing the tree type). This type of transformation includes converting one expression to another expression, one Logical Plan to another Logical Plan, and one Physical Plan to another Physical Plan. The second type of transformation converts one tree type to another type, for example, from a Logical Plan to a Physical Plan. A Logical Plan is converted to a Physical Plan by applying a set of strategies. These strategies use pattern matching to convert a tree to the other type. For example, we have specific patterns for matching logical project and filter operators to physical project and filter operators, respectively.
A set of rules can also be combined into a single rule to accomplish a specific transformation. For example, depending on your query, predicates such as filter can be pushed down to reduce the overall number of rows before executing a join operation. In addition, if your query has an expression with constants in your query, then constant folding optimization computes the expression once at the time of compilation instead of repeating it for every row during runtime. Furthermore, if your query requires a subset of columns, then column pruning can help reduce the columns to the essential ones. All these rules can be combined into a single rule to achieve all three transformations.
In the following example, we measure the difference in execution times on Spark 1.6 and Spark 2.2. We use the iPinYou Real-Time Bidding Dataset for Computational Advertising Research in our next example. This Dataset contains the data from three seasons of the iPinYou global RTB bidding algorithm competition. You can download this Dataset from the data server at University College London at
First, we define the case classes for the records in the bid transactions and the region  files:
scala> case class PinTrans(bidid: String, timestamp: String, ipinyouid: String, useragent: String, IP: String, region: String, city: String, adexchange: String, domain: String, url:String, urlid: String, slotid: String, slotwidth: String, slotheight: String, slotvisibility: String, slotformat: String, slotprice: String, creative: String, bidprice: String)
scala> case class PinRegion(region: String, regionName: String)
Next, we create the DataFrames from one of the bids files and the region file:
scala> val pintransDF = spark.sparkContext.textFile("file:///Users/aurobindosarkar/Downloads/make-ipinyou-data-master/original-data/ipinyou.contest.dataset/training1st/bid.20130314.txt").map(_.split("\t")).map(attributes => PinTrans(attributes(0).trim, attributes(1).trim, attributes(2).trim, attributes(3).trim, attributes(4).trim, attributes(5).trim, attributes(6).trim, attributes(7).trim, attributes(8).trim, attributes(9).trim, attributes(10).trim, attributes(11).trim, attributes(12).trim, attributes(13).trim, attributes(14).trim, attributes(15).trim, attributes(16).trim, attributes(17).trim, attributes(18).trim)).toDF()
scala> val pinregionDF = spark.sparkContext.textFile("file:///Users/aurobindosarkar/Downloads/make-ipinyou-data-master/original-data/ipinyou.contest.dataset/region.en.txt").map(_.split("\t")).map(attributes => PinRegion(attributes(0).trim, attributes(1).trim)).toDF()
Next, we borrow a simple benchmark function (available in several Databricks sample notebooks) to measure the execution time:
scala> def benchmark(name: String)(f: => Unit) {
val startTime = System.nanoTime
val endTime = System.nanoTime
println(s"Time taken in $name: " + (endTime - startTime).toDouble / 1000000000 + " seconds")
We use the SparkSession object to set the whole-stage code generation parameter off (this roughly translates to the Spark 1.6 environment). We also measure the execution time for a join operation between the two DataFrames:
scala> spark.conf.set("spark.sql.codegen.wholeStage", false)
scala> benchmark("Spark 1.6") {
| pintransDF.join(pinregionDF, "region").count()
| }
Time taken in Spark 1.6: 3.742190552 seconds
Next, we set the whole-stage code generation parameter to true and measure the execution time. We note that the execution time is much lower for the same code in Spark 2.2:
scala> spark.conf.set("spark.sql.codegen.wholeStage", true)
scala> benchmark("Spark 2.2") {
| pintransDF.join(pinregionDF, "region").count()
| }
Time taken in Spark 2.2: 1.881881579 seconds
We use the explain() function to print out the various stages in the Catalyst transformations pipeline. We will explain the following output in more detail in Chapter 11, Tuning Spark SQL Components for Performance:
scala> pintransDF.join(pinregionDF, "region").selectExpr("count(*)").explain(true)
In the next section, we present developer-relevant details of Project Tungsten.
Introducing Project Tungsten
Project Tungsten was touted as the largest change to Spark's execution engine since the project's inception. The motivation for Project Tungsten was the observation that CPU and memory, rather than I/O and network, were the bottlenecks in a majority of Spark workloads.
The CPU is the bottleneck now because of the improvements in hardware (for example, SSDs and striped HDD arrays for storage), optimizations done to Spark's I/O (for example, shuffle and network layer implementations, input data pruning for disk I/O reduction, and so on) and improvements in data formats (for example, columnar formats like Parquet, binary data formats, and so on). In addition, large-scale serialization and hashing tasks in Spark are CPU-bound operations.
Spark 1.x used a query evaluation strategy based on an iterator model (referred to as the Volcano model). As each operator in a query presented an interface that returned a tuple at a time to the next operator in the tree, this interface allowed query execution engines to compose arbitrary combinations of operators. Before Spark 2.0, a majority of the CPU cycles were spent in useless work, such as making virtual function calls or reading/writing intermediate data to CPU cache or memory.
Project Tungsten focuses on three areas to improve the efficiency of memory and CPU to push the performance closer to the limits of the underlying hardware. These three areas are memory management and binary processing, cache-aware computation, and code generation. Additionally, the second generation Tungsten execution engine, integrated in Spark 2.0, uses a technique called whole-stage code generation. This technique enables the engine to eliminate virtual function dispatches and move intermediate data from memory to CPU registers, and exploits the modern CPU features through loop unrolling and SIMD. In addition, the Spark 2.0 engine also speeds up operations considered too complex for code generation by employing another technique, called vectorization.
Whole-stage code generation collapses the entire query into a single function. Further, it eliminates virtual function calls and uses CPU registers for storing intermediate data. This in turn, significantly improves CPU efficiency and runtime performance. It achieves the performance of hand-written code, while continuing to remain a general-purpose engine.
In vectorization, the engine batches multiple rows together in a columnar format and each operator iterates over the data within a batch. However, it still requires putting intermediate data in-memory rather than keeping them in CPU registers. As a result, vectorization is only used when it is not possible to do whole-stage code generation.
Tungsten memory management improvements focus on storing Java objects in compact binary format to reduce GC overhead, denser in-memory data format to reduce spillovers (for example, the Parquet format), and for operators that understand data types (in the case of DataFrames, Datasets, and SQL) to work directly against binary format in memory rather than serialization/deserialization and so on.
Code generation exploits modern compilers and CPUs for implementing improvements. These include faster expression evaluation and DataFrame/SQL operators, and a faster serializer. Generic evaluation of expressions is very expensive on the JVM, due to virtual function calls, branches based on expression type, object creation, and memory consumption due to primitive boxing. By generating custom bytecode on the fly, these overheads are largely eliminated.
Here, we present the Physical Plan for our join operation between the bids and the region DataFrames from the preceding section with whole-stage code generation enabled. In the explain() output, when an operator is marked with a star *, then it means that the whole-stage code generation is enabled for that operation. In the following physical plan, this includes the Aggregate, Project, SortMergeJoin, Filter, and Sort operators. Exchange, however, does not implement whole-stage code generation because it is sending data across the network:
scala> pintransDF.join(pinregionDF, "region").selectExpr("count(*)").explain()
Project Tungsten hugely benefits DataFrames and Datasets (for all programming APIs--Java, Scala, Python, and R) and Spark SQL queries. Also, for many of the data processing operators, the new engine is orders of magnitude faster.
In the next section, we shift our focus to a new Spark 2.0 feature, called Structured Streaming, that supports Spark-based streaming applications.