Reading and writing data from and to an Azure SQL database using native connectors
Reading and writing data from and to an Azure SQL database is the most important step in most data ingestion pipelines. You will have a step in your data ingestion pipeline where you will load the transformed data into Azure SQL or read raw data from Azure SQL to perform some transformations.
In this recipe, you will learn how to read and write data using SQL Server JDBC Driver and the Apache Spark connector for Azure SQL.
Getting ready
The Apache Spark connector for Azure SQL only supports Spark 2.4.x and 3.0.x clusters as of now and might change in future. SQL Server JDBC Driver supports both Spark 2.4.x and 3.0.x clusters. Before we start working on the recipe, we need to create a Spark 2.4.x or 3.0.x cluster. You can follow the steps mentioned in the Creating a cluster from the UI to create 2.x clusters recipe from Chapter 1, Creating an Azure Databricks Service.
We have used Databricks Runtime Version 7.3 LTS with Spark 3.0.1 having Scala version as 2.12 for this recipe. The code is tested with Databricks Runtime Version 6.4 that includes Spark 2.4.5 and Scala 2.11 as well
You need to create an Azure SQL database—to do so, follow the steps at this link:
https://docs.microsoft.com/en-us/azure/azure-sql/database/single-database-create-quickstart?tabs=azure-portal
After your Azure SQL database is created, connect to the database and create the following table in the newly created database:
CREATE TABLE [dbo].[CUSTOMER]( Â Â Â Â Â [C_CUSTKEY] [int] NULL, Â Â Â Â Â [C_NAME] [varchar](25) NULL, Â Â Â Â Â [C_ADDRESS] [varchar](40) NULL, Â Â Â Â Â [C_NATIONKEY] [smallint] NULL, Â Â Â Â Â [C_PHONE] [char](15) NULL, Â Â Â Â Â [C_ACCTBAL] [decimal](18, 0) NULL, Â Â Â Â Â [C_MKTSEGMENT] [char](10) NULL, Â Â Â Â Â [C_COMMENT] [varchar](117) NULL ) ON [PRIMARY] GO
Once the table is created, you can proceed with the steps mentioned in the How to do it… section. You can follow along the steps mentioned in the notebook 2_4.Reading and Writing from and to Azure SQL Database.ipynb.
How to do it…
You will learn how to use SQL Server JDBC Driver and the Apache Spark connector for Azure SQL to read and write data to and from an Azure SQL database. You will learn how to install the Spark connector for Azure SQL in a Databricks cluster.
Here are the steps to read data from an Azure SQL database using SQL Server JDBC Driver:
- First, create a variable for the connection string and the table from which we will be reading and writing the data. We will load the
csv
files from ADLS Gen2 that we saw in the Reading and writing data from and to ADLS Gen2 recipe:# Details about connection string logicalServername = "demologicalserver.database.windows.net" databaseName = "demoDB" tableName = "CUSTOMER" userName = "sqladmin" password = "Password@Strong12345" # Please specify password here jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(logicalServername, 1433, databaseName) connectionProperties = { Â Â "user" : userName, Â Â "password" : password, Â Â "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver" }
- As you can see from the preceding step, the driver we are using is called
SQLServerDriver
, which comes installed as part of Databricks Runtime. - Create a schema for the
csv
files, store this in ADLS Gen-2, and mount the storage to DBFS. Follow the steps mentioned in the third recipe, Reading and writing data from and to ADLS Gen2, to learn how to mount storage to DBFS:#Creating a schema which can be passed while creating the DataFrame cust_schema = StructType([ Â Â Â Â StructField("C_CUSTKEY", IntegerType()), Â Â Â Â StructField("C_NAME", StringType()), Â Â Â Â StructField("C_ADDRESS", StringType()), Â Â Â Â StructField("C_NATIONKEY", ShortType()), Â Â Â Â StructField("C_PHONE", StringType()), Â Â Â Â StructField("C_ACCTBAL", DecimalType(18,2)), Â Â Â Â StructField("C_MKTSEGMENT", StringType()), Â Â Â Â StructField("C_COMMENT", StringType()) ])
- Once a schema is created, we will read the
csv
files in a DataFrame:# Reading customer csv files in a DataFrame. This Dataframe will be written to Customer table in Azure SQL DB df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("dbfs:/mnt/Gen2/Customer/csvFiles")
- After the preceding step is executed, we will write the DataFrame to the
dbo.CUSTOMER
table that we have already created as part of the Getting ready section:df_cust.write.jdbc(jdbcUrl,                     mode ="append",                    table=tableName,                    properties=connectionProperties)
- After loading the data, we will read the table to count the number of records inserted in the table:
df_jdbcRead= spark.read.jdbc(jdbcUrl,                     table=tableName,                    properties=connectionProperties) # Counting number of rows df_jdbcRead.count()
Here are the steps to read data from an Azure SQL database using Apache Spark Connector for Azure SQL Database.
You can also download the connector from https://search.maven.org/search?q=spark-mssql-connector.
- After a Spark 2.4.x or 3.0.x cluster is created, you need to install Spark connector for Azure SQL DB from Maven. Make sure you use the coordinates as mentioned in the preceding table. Go to the Databricks Clusters page and click on Libraries. Then, click on Install New and select the library source as Maven. Now, click on Search Packages and search for
spark-mssql-connector
: - Under Search Packages, select Maven Central and search for
spark-mssql-connector
and select the version with artifact idspark-mssql-connector_2.12
with Releases as 1.1.0 as we are using Spark 3.0.1 cluster and click on Select. You can use any latest version which is available when you are going through the recipe. If you are using Spark 2.4.x cluster then you must use the version with Artifact Id asspark-mssql-connector
with Releases version 1.0.2. - After selecting the package, it gets installed and you will see the status as Installed:
- After the Spark connector for Azure SQL is installed then you can run the follow code for setting the connection string for Azure SQL
server_name = f"jdbc:sqlserver://{logicalServername}" database_name = "demoDB" url = server_name + ";" + "databaseName=" + database_name + ";" table_name = "dbo.Customer" username = "sqladmin" password = "xxxxxx" # Please specify password here
- After the Spark connector is installed, we will read the records from the
dbo.CUSTOMER
table using the newly installed Spark connector for Azure SQL:sparkconnectorDF = spark.read \ Â Â Â Â Â Â Â Â .format("com.microsoft.sqlserver.jdbc.spark") \ Â Â Â Â Â Â Â Â .option("url", url) \ Â Â Â Â Â Â Â Â .option("dbtable", table_name) \ Â Â Â Â Â Â Â Â .option("user", username) \ Â Â Â Â Â Â Â Â .option("password", password).load()
- Run the following code to check the schema of the DataFrame created as part of the preceding step:
display(sparkconnectorDF.printSchema())
- To view a few records from the DataFrame, run the following code:
display(sparkconnectorDF.limit(10))
- Create a schema for the
csv
files, store this in ADLS Gen-2, and mount it to DBFS. Follow the steps mentioned in the Reading and writing data from and to ADLS Gen2 recipe to learn how to mount ADLS Gen-2 Storage Account to DBFS:#Creating a schema which can be passed while creating the DataFrame cust_schema = StructType([ Â Â Â Â StructField("C_CUSTKEY", IntegerType()), Â Â Â Â StructField("C_NAME", StringType()), Â Â Â Â StructField("C_ADDRESS", StringType()), Â Â Â Â StructField("C_NATIONKEY", ShortType()), Â Â Â Â StructField("C_PHONE", StringType()), Â Â Â Â StructField("C_ACCTBAL", DecimalType(18,2)), Â Â Â Â StructField("C_MKTSEGMENT", StringType()), Â Â Â Â StructField("C_COMMENT", StringType()) ])
- Once a schema is created, we will load the
csv
files in a DataFrame by running the following code:df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("dbfs:/mnt/Gen2/Customer/csvFiles")
- In the following step, we will learn how we can write the DataFrame to an Azure SQL database table using the
append
method:#Appending records to the existing table try: Â Â df_cust.write \ Â Â Â Â .format("com.microsoft.sqlserver.jdbc.spark") \ Â Â Â Â .mode("append") \ Â Â Â Â .option("url", url) \ Â Â Â Â .option("dbtable", tableName) \ Â Â Â Â .option("user", userName) \ Â Â Â Â .option("password", password) \ Â Â Â Â .save() except ValueError as error : Â Â Â Â print("Connector write failed", error)
- The preceding code will append the data in the existing table; if no table exists, then it will throw an error. The following code will overwrite the existing data in the table:
try: Â Â df_cust.write \ Â Â Â Â .format("com.microsoft.sqlserver.jdbc.spark") \ Â Â Â Â .mode("overwrite") \ Â Â Â Â .option("truncate",True) \ Â Â Â Â .option("url", url) \ Â Â Â Â .option("dbtable", tableName) \ Â Â Â Â .option("user", userName) \ Â Â Â Â .option("password", password) \ Â Â Â Â .save() except ValueError as error : Â Â Â Â print("Connector write failed", error)
- As the last step, we will read the data loaded in the customer table to ensure the data is loaded properly:
#Read the data from the table sparkconnectorDF = spark.read \ Â Â Â Â Â Â Â Â .format("com.microsoft.sqlserver.jdbc.spark") \ Â Â Â Â Â Â Â Â .option("url", url) \ Â Â Â Â Â Â Â Â .option("dbtable", table_name) \ Â Â Â Â Â Â Â Â .option("user", username) \ Â Â Â Â Â Â Â Â .option("password", password).load()
How it works…
The Apache Spark connector works the latest version of Spark 2.4.x and Spark 3.0.x. It can be used for both SQL Server and Azure SQL Database and is customized for SQL Server and Azure SQL Database for performing big data analytics efficiently. The following document outlines the benefits of using the Spark connector and provides a performance comparison between the JDBC connector and the Spark connector:
https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15
To overwrite the data using the Spark connector, we are in overwrite
mode, which will drop and recreate the table with the scheme based on the source DataFrame schema, and none of the indexes that were present on that table will be added after the table is recreated. If we want to recreate the indexes with overwrite
mode, then we need to include the True
option (truncate
). This option will ensure that after a table is dropped and created, the required index will be created as well.
Just to append data to an existing table, we will use append
mode, whereby the existing table will not be dropped or recreated. If the table is not found, it throws an error. This option is used when we are just inserting data into a raw table. If it's a staging table where we want to truncate before load, then we need to use overwrite mode with the truncate
option.