[box type="note" align="" class="" width=""]This article is a book excerpt from Learning Spark SQL written by Aurobindo Sarkar. This book gives an insight into the engineering practices used to design and build real-world Spark based applications. The hands on examples illustrated in the book will give you required confidence to work on future projects you encounter in Spark SQL. [/box]
In the article, we shall talk about Spark SQL and its use in streaming applications.
A streaming application is a program that has its necessary components downloaded as needed instead of being installed ahead of time on a computer. Application streaming is a method of delivering virtualized applications. Streaming applications are getting increasingly complex, because such computations don't run in isolation. They need to interact with batch data, support interactive analysis, support sophisticated machine learning applications, and so on. Typically, such applications store incoming event stream(s) on long-term storage, continuously monitor events, and run machine learning models on the stored data, while simultaneously enabling continuous learning on the incoming stream. They also have the capability to interactively query the stored data while providing exactly-once write guarantees, handling late arriving data, performing aggregations, and so on. These types of applications are a lot more than mere streaming applications and have, therefore, been termed as continuous applications.
Before Spark 2.0, streaming applications were built on the concept of DStreams. There were several pain points associated with using DStreams. In DStreams, the timestamp was when the event actually came into the Spark system; the time embedded in the event was not taken into consideration. In addition, though the same engine can process both the batch and streaming computations, the APIs involved, though similar between RDDs (batch) and DStream (streaming), required the developer to make code changes. The DStream streaming model placed the burden on the developer to address various failure conditions, and it was hard to reason about data consistency issues. In Spark 2.0, Structured Streaming was introduced to deal with all of these pain points.
Structured Streaming is a fast, fault-tolerant, exactly-once stateful stream processing approach. It enables streaming analytics without having to reason about the underlying mechanics of streaming. In the new model, the input can be thought of as data from an append-only table (that grows continuously). A trigger specifies the time interval for checking the input for the arrival of new data. As shown in the following figure, the query represents the queries or the operations, such as map, filter, and reduce on the input, and result represents the final table that is updated in each trigger interval, as per the specified operation. The output defines the part of the result to be written to the data sink in each time interval.
The output modes can be complete, delta, or append, where the complete output mode means writing the full result table every time, the delta output mode writes the changed rows from the previous batch, and the append output mode writes the new rows only, Respectively:
In Spark 2.0, in addition to the static bounded DataFrames, we have the concept of a continuous unbounded DataFrame. Both static and continuous DataFrames use the same API, thereby unifying streaming, interactive, and batch queries. For example, you can aggregate data in a stream and then serve it using JDBC. The high-level streaming API is built on the Spark SQL engine and is tightly integrated with SQL queries and the DataFrame/Dataset APIs. The primary benefit is that you use the same high-level Spark DataFrame and Dataset APIs, and the Spark engine figures out the incremental and continuous execution required for operations. Additionally, there are query management APIs that you can use to manage multiple, concurrently running, and streaming queries. For instance, you can list running queries, stop and restart queries, retrieve exceptions in case of failures, and so on.
In the example code below, we use two bid files from the iPinYou Dataset as the source for our streaming data. First, we define our input records schema and create a streaming input DataFrame:
scala> import org.apache.spark.sql.types._
scala> import org.apache.spark.sql.functions._
scala> import scala.concurrent.duration._
scala> import org.apache.spark.sql.streaming.ProcessingTime
scala> import org.apache.spark.sql.streaming.OutputMode.Complete
scala> val bidSchema = new StructType().add("bidid",
StringType).add("timestamp", StringType).add("ipinyouid",
StringType).add("useragent", StringType).add("IP",
StringType).add("region", IntegerType).add("city",
IntegerType).add("adexchange", StringType).add("domain",
StringType).add("url:String", StringType).add("urlid: String",
StringType).add("slotid: String", StringType).add("slotwidth",
StringType).add("slotheight", StringType).add("slotvisibility",
StringType).add("slotformat", StringType).add("slotprice",
StringType).add("creative", StringType).add("bidprice", StringType)
scala> val streamingInputDF =
spark.readStream.format("csv").schema(bidSchema).option("header",
false).option("inferSchema", true).option("sep",
"t").option("maxFilesPerTrigger",
1).load("file:///Users/aurobindosarkar/Downloads/make-ipinyou-datamaster/
original-data/ipinyou.contest.dataset/bidfiles")
Next, we define our query with a time interval of 20 seconds and the output mode as Complete:
scala> val streamingCountsDF = streamingInputDF.groupBy($"city").count()
scala> val query =
streamingCountsDF.writeStream.format("console").trigger(ProcessingTime(20.s
econds)).queryName("counts").outputMode(Complete).start()
In the output, it is observed that the count of bids from each region gets updated in each time interval as new data arrives. The new bid files need to be dropped (or start with multiple bid files, as they will get picked up for processing one at a time based on the value of maxFilesPerTrigger) from the original Dataset into the bidfiles directory to see the updated results.
Sources and incrementally executes the computation on it before writing it to the sink. In addition, any running aggregates required by your application are maintained as in-memory states backed by a Write-Ahead Log (WAL). The in-memory state data is generated and used across incremental executions. The fault tolerance requirements for such applications include the ability to recover and replay all data and metadata in the system. The planner writes offsets to a fault-tolerant WAL on persistent storage, such as HDFS, before execution as illustrated in the figure:
In case the planner fails on the current incremental execution, the restarted planner reads from the WAL and re-executes the exact range of offsets required. Typically, sources such as Kafka are also fault-tolerant and generate the original transactions data, given the appropriate offsets recovered by the planner. The state data is usually maintained in a versioned, key-value map in Spark workers and is backed by a WAL on HDFS. The planner ensures that the correct version of the state is used to re-execute the transactions subsequent to a failure. Additionally, the sinks are idempotent by design, and can handle the re-executions without double commits of the output. Hence, an overall combination of offset tracking in WAL, state management, and fault-tolerant sources and sinks provide the end-to- end exactly-once guarantees.
SparkSQL provides one of the best platforms for implementing streaming applications. The internal architecture and the fault tolerant behavior implies that modern day developers who want to create data intensive applications with data streaming capabilities, will have to use the power of SparkSQL.
If you liked our post, please be sure to check out Learning Spark SQL which consists of more useful techniques on data extraction and data analysis using Spark SQL.