Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Azure Databricks Cookbook

You're reading from   Azure Databricks Cookbook Accelerate and scale real-time analytics solutions using the Apache Spark-based analytics service

Arrow left icon
Product type Paperback
Published in Sep 2021
Publisher Packt
ISBN-13 9781789809718
Length 452 pages
Edition 1st Edition
Languages
Tools
Arrow right icon
Authors (2):
Arrow left icon
Vinod Jaiswal Vinod Jaiswal
Author Profile Icon Vinod Jaiswal
Vinod Jaiswal
Phani Raj Phani Raj
Author Profile Icon Phani Raj
Phani Raj
Arrow right icon
View More author details
Toc

Table of Contents (12) Chapters Close

Preface 1. Chapter 1: Creating an Azure Databricks Service 2. Chapter 2: Reading and Writing Data from and to Various Azure Services and File Formats FREE CHAPTER 3. Chapter 3: Understanding Spark Query Execution 4. Chapter 4: Working with Streaming Data 5. Chapter 5: Integrating with Azure Key Vault, App Configuration, and Log Analytics 6. Chapter 6: Exploring Delta Lake in Azure Databricks 7. Chapter 7: Implementing Near-Real-Time Analytics and Building a Modern Data Warehouse 8. Chapter 8: Databricks SQL 9. Chapter 9: DevOps Integrations and Implementing CI/CD for Azure Databricks 10. Chapter 10: Understanding Security and Monitoring in Azure Databricks 11. Other Books You May Enjoy

Reading and writing data from and to Azure Synapse SQL (dedicated SQL pool) using native connectors 

In this recipe, you will learn how to read and write data to Azure Synapse Analytics using Azure Databricks.

Azure Synapse Analytics is a data warehouse hosted in the cloud that leverages massively parallel processing (MPP) to run complex queries across large volumes of data.

Azure Synapse can be accessed from Databricks using the Azure Synapse connector. DataFrames can be directly loaded as a table in a Synapse Spark pool. Azure Blob storage is used as temporary storage to upload data between Azure Databricks and Azure Synapse with the Azure Synapse connector.

Getting ready

You will need to ensure you have the following items before starting to work on this recipe:

  • An Azure Synapse Analytics workspace. You can follow the steps mentioned in the following link to create Synapse Workspace and SQL Pool.

    https://docs.microsoft.com/en-in/azure/synapse-analytics/get-started-create-workspace

  • We are using Azure Databrick Runtime 7.3 LTS with Spark Cluster 3.0.1
  • A database master key for Azure Synapse Dedicated SQL Pool. Run the following command in the SQL Pool to create the Master Key.
CREATE MASTER KEY
  • Account Name and Storage keys for Azure Storage Blob —get these by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
  • Service principal details—get these by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
  • A mounted ADLS Gen-2 storage account—get this by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
  • Use the same Customer csvFiles that was used in the Reading and writing data from and to Azure Blob recipe.
  • Get the JDBC connection string of Synapse Workspace. Go to the Dedicated SQL Pool that you have created in the Synapse Workspace and in the Overview tab click on Show database connection strings. Under the Connection strings tab, click on JDBC and copy the connection string under JDBC (SQL authentication)

You can follow the steps by running the steps in the 2_5.Reading and Writing Data from and to Azure Synapse.ipynb notebook in your local cloned repository in the Chapter02 folder.

Upload the csvFiles folder in the Chapter02/Customer folder to the ADLS Gen2 account in the rawdata file system. We have mounted the rawdata container as /mnt/Gen2Source.

How to do it…

In this section, you will see the steps for reading data from an ADLS Gen2 storage account and writing data to Azure Synapse Analytics using the Azure Synapse connector:

  1. You need to mount the storage location for accessing the data files from the storage account. You can find detailed steps on how to mount ADLS Gen-2 in the Mounting ADLS Gen-2 and Azure Blob Storage to Azure Databricks File System recipe. Run the following to mount ADLS Gen-2 Storage Account.
    storageAccount="teststoragegen2"
    mountpoint = "/mnt/Gen2Source"
    storageEndPoint ="abfss://rawdata@{}.dfs.core.windows.net/".format(storageAccount)
    print ('Mount Point ='+mountpoint)
    #ClientId, TenantId and Secret is for the Application(ADLSGen2App) was have created as part of this recipe
    clientID ="xxx-xxx-xx-xxx"
    tenantID ="xx-xxx-xxx-xxx"
    clientSecret ="xx-xxx-xxx-xxx"
    oauth2Endpoint = "https://login.microsoftonline.com/{}/oauth2/token".format(tenantID)
    configs = {"fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": clientID,
    "fs.azure.account.oauth2.client.secret": clientSecret,
    "fs.azure.account.oauth2.client.endpoint": oauth2Endpoint}
    dbutils.fs.mount(
    source = storageEndPoint,
    mount_point = mountpoint,
    extra_configs = configs)
  2. Copy and run the following code to see the csv files in the ADLS Gen-2 account:
    display(dbutils.fs.ls("/mnt/Gen2Source/Customer/csvFiles "))
  3. Provide the Azure Storage account configuration like Storage Account name and Storage Key. This is used by both the Azure Databricks cluster and Azure Synapse Dedicated SQL Pool to access a common Blob Storage Account for exchanging data between them. Azure Synapse connector triggers the Spark job in Azure Databricks cluster to read and write data from and to the common Blob Storage Account.
    blobStorage = "stgcccookbook.blob.core.windows.net"
    blobContainer = "synapse"
    blobAccessKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
  4. The following code will be used for specifying an intermediate temporary folder required for moving data between Azure Databricks and Azure Synapse:
    tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
  5. Execute the following code to store the Azure Blob storage access keys in the Notebook session configuration. This configuration will not impact other Notebooks attached to the same cluster.
    acntInfo = "fs.azure.account.key."+ blobStorage
    spark.conf.set(
      acntInfo,
      blobAccessKey)
  6. Run the following code to load the DataFrame from the csv files in the ADLS Gen-2 Storage Account:
    customerDF = 
    spark.read.format("csv").option("header",True).option("inferSchema", True).load("dbfs:/mnt/Gen2Source/Customer/csvFiles") 
  7. Run the following command to store the JDBC URL for the Synapse Dedicated SQL Pool in a variable.
    # We have changed trustServerCertificate=true from trustServerCertificate=false. In certain cases, you might get errors like 
    '''
    The driver could not establish a secure connection to SQL Server by using Secure Sockets Layer (SSL) encryption. Error: "Failed to validate the server name in a certificate during Secure Sockets Layer (SSL) initialization.". ClientConnectionId:sadasd-asds-asdasd [ErrorCode = 0] [SQLState = 08S '''
      
    sqlDwUrl="jdbc:sqlserver://synapsetestdemoworkspace.sql.azuresynapse.net:1433;database=sqldwpool1;user=sqladminuser@synapsetestdemoworkspace;password=xxxxxxx;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
    db_table = "dbo.customer"
  8. Execute the following code to load the customerDF DataFrame as a table into Azure Synapse Dedicated SQL Pool. This creates a table named CustomerTable in the SQL database. You can query the table using SQL Server Management Studio (SSMS) or from Synapse Studio. This is the default save mode where when writing a DataFrame to Dedicated SQL Pool, if table already exists then an exception is thrown else it will create a table and populated the data in the table:
    customerDF.write \
      .format("com.databricks.spark.sqldw")\
      .option("url", sqlDwUrl)\
      .option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable", db_table)\
      .option("tempDir", tempDir)\
      .save()
  9. Connect to the SQL database using SSMS, and you can query data from the CustomerTable table:
    Figure 2.16 – Azure Synapse table verification

    Figure 2.16 – Azure Synapse table verification

  10. Run the following code which will append the data to an existing dbo.Customer table.
    # This code is writing to data into SQL Pool with append save option. In append save option data is appended to existing table.
    customerDF.write \
      .format("com.databricks.spark.sqldw")\
      .option("url", sqlDwUrl)\
      .option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable", db_table)\
      .option("tempDir", tempDir)\
      .mode("append")\
      .save()
  11. Run the following code which will overwrite the data in an existing dbo.Customer table.
    customerDF.write \
      .format("com.databricks.spark.sqldw")\
      .option("url", sqlDwUrl)\
      .option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable", db_table)\
      .option("tempDir", tempDir)\
      .mode("overwrite")\
      .save()
  12. Run the following code to read data from Azure Synapse Dedicated SQL Pool using an Azure Synapse connector:
    customerTabledf = spark.read \
      .format("com.databricks.spark.sqldw") \
      .option("url", sqlDwUrl) \
      .option("tempDir", tempDir) \
      .option("forwardSparkAzureStorageCredentials", "true") \
      .option("dbTable", db_table) \
      .load()
  13. You can see the result in the DataFrame by using the following code:
    customerTabledf.show()
  14. Instead of table you can use query to read data from Dedicated SQL Pool. You can see from the following code that we have used option("query",query) to run a query on the database.
    query= " select C_MKTSEGMENT, count(*) as Cnt from [dbo].[customer] group by C_MKTSEGMENT"
    df_query = spark.read \
      .format("com.databricks.spark.sqldw") \
      .option("url", sqlDwUrl) \
      .option("tempDir", tempDir) \
      .option("forwardSparkAzureStorageCredentials", "true") \
      .option("query", query) \
      .load()
  15. You can execute the following code to display the results.
    display(df_query.limit(5))

By the end of this recipe, you have learnt how to read and write data from and to Synapse Dedicated SQL Pool using the Azure Synapse Connector from Azure Databricks.

How it works…

In Azure Synapse, data loading and unloading operations are performed by PolyBase and are triggered by the Azure Synapse connector through JDBC.

For Databricks Runtime 7.0 and above, the Azure Synapse connector through JDBC uses COPY to load data into Azure Synapse.

The Azure Synapse connector triggers Apache Spark jobs to read and write data to the Blob storage container.

We are using spark.conf.set (acntInfo, blobAccessKey) so that Spark connects to the Storage Blob container using the built-in connectors. We can use ADLS Gen-2 as well to the store the data read from Synapse or data written to Synapse Dedicated SQL Pool.

Spark connects to Synapse using the JDBC drivers with username and password. Azure Synapse connects to Azure Storage Blob account using account key and it can use Managed Service Identity as well to connect.

Following is the JDBC connection string used in the Notebook. In this case we have changed trustServerCertificate value to true from false (which is default value) and by doing so, Microsoft JDBC Driver for SQL Server won't validate the SQL Server TLS certificate. This change must be done only in test environments and not in production.

sqlDwUrl="jdbc:sqlserver://synapsetestdemoworkspace.sql.azuresynapse.net:1433;database=sqldwpool1;user=sqladminuser@synapsetestdemoworkspace;password=xxxxxxx;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"

If you are not creating the Master Key in the Synapse Dedicated SQL Pool you will be getting the following error while reading the data from the Dedicated SQL Pool from Azure Databricks Notebook.

Underlying SQLException(s):
  - com.microsoft.sqlserver.jdbc.SQLServerException: Please create a master key in the database or open the master key in the session before performing this operation. [ErrorCode = 15581] [SQLState = S0006]

When you are writing the data to Dedicated SQL Pools you will see the following messages while code is getting executed n the Notebook.

Waiting for Azure Synapse Analytics to load intermediate data from wasbs://synapse@stgacccookbook.blob.core.windows.net/tempDirs/2021-08-07/04-19-43-514/cb6dd9a2-da6b-4748-a34d-0961d2df981f/ into "dbo"."customer" using COPY.

Spark is writing the csv files to the common Blob Storage as parquet files and then Synapse uses COPY statement to load the parquet files to the final tables. You can check in Blob Storage Account, and you will find the parquet files created. When you look at the DAG for the execution (you will learn more about DAG in upcoming recipes) you will find that Spark is reading the CSV files and writing to Blob Storage as parquet format.

When you are reading data from Synapse Dedicated SQL Pool tables you will see the following messages in the Notebook while the code is getting executed.

Waiting for Azure Synapse Analytics to unload intermediate data under wasbs://synapse@stgacccookbook.blob.core.windows.net/tempDirs/2021-08-07/04-24-56-781/esds43c1-bc9e-4dc4-bc07-c4368d20c467/ using Polybase

Azure Synapse is writing the data from Dedicated SQL Pool to common Blob storage as parquet files first with snappy compression and then its is read by Spark and displayed to the end user when customerTabledf.show() is executed.

We are setting up the account key and secret for the common Blob Storage in the session configuration associated with the Notebook using spark.conf.set and setting the value forwardSparkAzureStorageCredentials as true. By setting the value as true, Azure Synapse connector will forward the Storage Account access key to the Azure Synapse Dedicated Pool by creating a temporary Azure Database Scoped Credentials.

You have been reading a chapter from
Azure Databricks Cookbook
Published in: Sep 2021
Publisher: Packt
ISBN-13: 9781789809718
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image