Spark SQL is the Spark module for structured data processing. The main difference between this API and the RDD API is that the provided Spark SQL interfaces give more information about the structure of both the data and the performed computation. This extra information is used by Spark internally to add extra optimizations through the Catalyst optimization engine, which is the same execution engine that's used regardless of whatever API or programming language is involved.
Spark SQL is commonly used to execute SQL queries (even if this isn't the only way to use it). Whatever programming language supported by Spark encapsulates the SQL code to be executed, the results of a query are returned as a Dataset. A Dataset is a distributed collection of data, and was added as an interface in Spark 1.6. It combines the benefits of RDDs (such as strong typing and the ability to apply useful lambda functions) with the benefits of Spark SQL's optimized execution engine (Catalyst, https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html). You can construct a Dataset by starting with Java/Scala objects and then manipulating it through the usual functional transformations. The Dataset API is available in Scala and Java, while Python doesn't have support for it. However, due to the dynamic nature of this programming language, many of the benefits of the Dataset API are already available for it.
Starting from Spark 2.0, the DataFrame and Dataset APIs have been merged into the Dataset API, so a DataFrame is just a Dataset that's been organized into named columns and is conceptually equivalent to a table in an RDBMS, but with better optimizations under the hood (being part of the Dataset API, the Catalyst optimization engine works behind the scenes for DataFrames, too). You can construct a DataFrame from diverse sources, such as structured data files, Hive tables, database tables, and RDDs, to name a few. Unlike the Dataset API, the DataFrame API is available in any of the programming languages that are supported by Spark.
Let's start and get hands-on so that we can better understand the concepts behind Spark SQL. The first full example I am going to show is Scala-based. Start a Scala Spark shell to run the following code interactively.
Let's use people.json as a data source. One of the files that's available as a resource for this example has been shipped along with the Spark distribution and can be used to create a DataFrame that's a Dataset of Rows (http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Row):
val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")
You can print the content of the DataFrame to the console to check that it is what you expected:
scala> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
Before you perform DataFrame operations, you need to import the implicit conversions (such as converting RDDs to DataFrames) and use the $ notation:
import spark.implicits._
Now, you can print the DataFrame schema in a tree format:
scala> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
Select a single column (let's say name):
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
Filter the data:
scala> df.filter($"age" > 27).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
Then add a groupBy clause:
scala> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
Select all rows and increment a numeric field:
scala> df.select($"name", $"age" + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
It is possible to run SQL queries programmatically through the sql function of SparkSession. This function returns the results of the query in a DataFrame, which, for Scala, is a Dataset[Row]. Let's consider the same DataFrame as for the previous example:
val df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")
You can register it as an SQL temporary view:
df.createOrReplaceTempView("people")
Then, you can execute an SQL query there:
scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
The same things can be done in Python as well:
>>> df = spark.read.json("/opt/spark/spark-2.2.1-bin-hadoop2.7/examples/src/main/resources/people.json")
Resulting in the following:
>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
>>> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
>>> df.filter(df['age'] > 21).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
>>> df.select(df['name'], df['age'] + 1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
>>> df.createOrReplaceTempView("people")
>>> sqlDF = spark.sql("SELECT * FROM people")
>>> sqlDF.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
Other features of Spark SQL and Datasets (data sources, aggregations, self-contained applications, and so on) will be covered in Chapter 3, Extract, Transform, Load.