Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Learning Spark SQL

You're reading from   Learning Spark SQL Architect streaming analytics and machine learning solutions

Arrow left icon
Product type Paperback
Published in Sep 2017
Publisher Packt
ISBN-13 9781785888359
Length 452 pages
Edition 1st Edition
Languages
Arrow right icon
Author (1):
Arrow left icon
Aurobindo Sarkar Aurobindo Sarkar
Author Profile Icon Aurobindo Sarkar
Aurobindo Sarkar
Arrow right icon
View More author details
Toc

Table of Contents (13) Chapters Close

Preface 1. Getting Started with Spark SQL FREE CHAPTER 2. Using Spark SQL for Processing Structured and Semistructured Data 3. Using Spark SQL for Data Exploration 4. Using Spark SQL for Data Munging 5. Using Spark SQL in Streaming Applications 6. Using Spark SQL in Machine Learning Applications 7. Using Spark SQL in Graph Applications 8. Using Spark SQL with SparkR 9. Developing Applications with Spark SQL 10. Using Spark SQL in Deep Learning Applications 11. Tuning Spark SQL Components for Performance 12. Spark SQL in Large-Scale Application Architectures

Using Spark SQL in streaming applications

Streaming applications are getting increasingly complex, because such computations don't run in isolation. They need to interact with batch data, support interactive analysis, support sophisticated machine learning applications, and so on. Typically, such applications store incoming event stream(s) on long-term storage, continuously monitor events, and run machine learning models on the stored data, while simultaneously enabling continuous learning on the incoming stream. They also have the capability to interactively query the stored data while providing exactly-once write guarantees, handling late arriving data, performing aggregations, and so on. These types of applications are a lot more than mere streaming applications and have, therefore, been termed as continuous applications.

Before Spark 2.0, streaming applications were built on the concept of DStreams. There were several pain points associated with using DStreams. In DStreams, the timestamp was when the event actually came into the Spark system; the time embedded in the event was not taken into consideration. In addition, though the same engine can process both the batch and streaming computations, the APIs involved, though similar between RDDs (batch) and DStream (streaming), required the developer to make code changes. The DStream streaming model placed the burden on the developer to address various failure conditions, and it was hard to reason about data consistency issues. In Spark 2.0, Structured Streaming was introduced to deal with all of these pain points.

Structured Streaming is a fast, fault-tolerant, exactly-once stateful stream processing approach. It enables streaming analytics without having to reason about the underlying mechanics of streaming. In the new model, the input can be thought of as data from an append-only table (that grows continuously). A trigger specifies the time interval for checking the input for the arrival of new data. As shown in the following figure, the query represents the queries or the operations, such as map, filter, and reduce on the input, and result represents the final table that is updated in each trigger interval, as per the specified operation. The output defines the part of the result to be written to the data sink in each time interval.

The output modes can be complete, delta, or append, where the complete output mode means writing the full result table every time, the delta output mode writes the changed rows from the previous batch, and the append output mode writes the new rows only, respectively:

In Spark 2.0, in addition to the static bounded DataFrames, we have the concept of a continuous unbounded DataFrame. Both static and continuous DataFrames use the same API, thereby unifying streaming, interactive, and batch queries. For example, you can aggregate data in a stream and then serve it using JDBC. The high-level streaming API is built on the Spark SQL engine and is tightly integrated with SQL queries and the DataFrame/Dataset APIs. The primary benefit is that you use the same high-level Spark DataFrame and Dataset APIs, and the Spark engine figures out the incremental and continuous execution required for operations.

Additionally, there are query management APIs that you can use to manage multiple, concurrently running, and streaming queries. For instance, you can list running queries, stop and restart queries, retrieve exceptions in case of failures, and so on. We will get more details regarding Structured Streaming in Chapter 5, Using Spark SQL in Streaming Applications.

In the example code below, we use two bid files from the iPinYou Dataset as the source for our streaming data. First, we define our input records schema and create a streaming input DataFrame:

scala> import org.apache.spark.sql.types._ 
scala> import org.apache.spark.sql.functions._
scala> import scala.concurrent.duration._
scala> import org.apache.spark.sql.streaming.ProcessingTime
scala> import org.apache.spark.sql.streaming.OutputMode.Complete

scala> val bidSchema = new StructType().add("bidid", StringType).add("timestamp", StringType).add("ipinyouid", StringType).add("useragent", StringType).add("IP", StringType).add("region", IntegerType).add("city", IntegerType).add("adexchange", StringType).add("domain", StringType).add("url:String", StringType).add("urlid: String", StringType).add("slotid: String", StringType).add("slotwidth", StringType).add("slotheight", StringType).add("slotvisibility", StringType).add("slotformat", StringType).add("slotprice", StringType).add("creative", StringType).add("bidprice", StringType)

scala> val streamingInputDF = spark.readStream.format("csv").schema(bidSchema).option("header", false).option("inferSchema", true).option("sep", "\t").option("maxFilesPerTrigger", 1).load("file:///Users/aurobindosarkar/Downloads/make-ipinyou-data-master/original-data/ipinyou.contest.dataset/bidfiles")

Next, we define our query with a time interval of 20 seconds and the output mode as Complete:

scala> val streamingCountsDF = streamingInputDF.groupBy($"city").count() 

scala> val query = streamingCountsDF.writeStream.format("console").trigger(ProcessingTime(20.seconds)).queryName("counts").outputMode(Complete).start()

In the output, you will observe that the count of bids from each region gets updated in each time interval as new data arrives. You will need to drop new bid files (or start with multiple bid files, as they will get picked up for processing one at a time based on the value of maxFilesPerTrigger) from the original Dataset into the bidfiles directory to see the updated results:

Additionally, you can also query the system for active streams, as follows:

scala> spark.streams.active.foreach(println) 
Streaming Query - counts [state = ACTIVE]

Finally, you can stop the execution of your streaming application using the stop() method, as shown:

//Execute the stop() function after you have finished executing the code in the next section.
scala> query.stop()

In the next section, we conceptually describe how Structured Streaming works internally.

Understanding Structured Streaming internals

To enable the Structured Streaming functionality, the planner polls for new data from the sources and incrementally executes the computation on it before writing it to the sink. In addition, any running aggregates required by your application are maintained as in-memory states backed by a Write-Ahead Log (WAL). The in-memory state data is generated and used across incremental executions. The fault tolerance requirements for such applications include the ability to recover and replay all data and metadata in the system. The planner writes offsets to a fault-tolerant WAL on persistent storage, such as HDFS, before execution as illustrated in the figure:.

In case the planner fails on the current incremental execution, the restarted planner reads from the WAL and re-executes the exact range of offsets required. Typically, sources such as Kafka are also fault-tolerant and generate the original transactions data, given the appropriate offsets recovered by the planner. The state data is usually maintained in a versioned, key-value map in Spark workers and is backed by a WAL on HDFS. The planner ensures that the correct version of the state is used to re-execute the transactions subsequent to a failure. Additionally, the sinks are idempotent by design, and can handle the re-executions without double commits of the output. Hence, an overall combination of offset tracking in WAL, state management, and fault-tolerant sources and sinks provide the end-to-end exactly-once guarantees.

We can list the Physical Plan for our example of Structured Streaming using the explain method, as shown:

scala> spark.streams.active(0).explain 

We will explain the preceding output in more detail in Chapter 11, Tuning Spark SQL Components for Performance.

You have been reading a chapter from
Learning Spark SQL
Published in: Sep 2017
Publisher: Packt
ISBN-13: 9781785888359
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image