Search icon CANCEL
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Azure Databricks Cookbook

You're reading from   Azure Databricks Cookbook Accelerate and scale real-time analytics solutions using the Apache Spark-based analytics service

Arrow left icon
Product type Paperback
Published in Sep 2021
Publisher Packt
ISBN-13 9781789809718
Length 452 pages
Edition 1st Edition
Languages
Tools
Arrow right icon
Authors (2):
Arrow left icon
Vinod Jaiswal Vinod Jaiswal
Author Profile Icon Vinod Jaiswal
Vinod Jaiswal
Phani Raj Phani Raj
Author Profile Icon Phani Raj
Phani Raj
Arrow right icon
View More author details
Toc

Table of Contents (12) Chapters Close

Preface 1. Chapter 1: Creating an Azure Databricks Service 2. Chapter 2: Reading and Writing Data from and to Various Azure Services and File Formats FREE CHAPTER 3. Chapter 3: Understanding Spark Query Execution 4. Chapter 4: Working with Streaming Data 5. Chapter 5: Integrating with Azure Key Vault, App Configuration, and Log Analytics 6. Chapter 6: Exploring Delta Lake in Azure Databricks 7. Chapter 7: Implementing Near-Real-Time Analytics and Building a Modern Data Warehouse 8. Chapter 8: Databricks SQL 9. Chapter 9: DevOps Integrations and Implementing CI/CD for Azure Databricks 10. Chapter 10: Understanding Security and Monitoring in Azure Databricks 11. Other Books You May Enjoy

Reading and writing data from and to ADLS Gen2

In this recipe, you will learn how to read and write data to ADLS Gen2 from Databricks. We can do this by following these two methods:

  • Mounting storage: Covered in the Mounting ADLS Gen2 and Azure Blob storage to Azure DBFS recipe of this chapter.
  • Directly accessing the ADLS Gen2 storage using a SAS token and a service principal: In this scenario, we will not mount the storage, but we will directly access the storage endpoint to read and write files using storage keys, service principals, and OAuth 2.0.

ADLS Gen2 provides file system semantics, which provides security to folders and files, and the hierarchical directory structure provides efficient access to the data in the storage.

By the end of this recipe, you will know multiple ways to read/write files from and to an ADLS Gen2 account.

Getting ready

You will need to ensure you have the following items before starting to work on this recipe:

  • An ADLS Gen2 account, mounted by following the steps in the first recipe of this chapter, Mounting ADLS Gen2 and Azure Blob to Azure Databricks File System.
  • Storage keys—you can get these by following the steps mentioned in the first recipe of this chapter, Mounting ADLS Gen2 and Azure Blob to Azure Databricks File System.

You can follow along by running the steps in the 2-3.Reading and Writing Data from and to ADLS Gen-2.ipynb notebook in your local cloned repository in the Chapter02 folder.

Upload the csvFiles folder in the Chapter02/Customer folder to the ADLS Gen2 account in the rawdata file system.

Note

We have tested the steps mentioned in this recipe on Azure Databricks Runtime version 6.4 which includes Spark 2.4.5 and on Runtime version 7.3 LTS which includes Spark 3.0.1

How to do it…

We will learn how to read CSV files from the mount point and the ADLS Gen2 storage directly. We will perform basic aggregation on the DataFrame, such as counting and storing the result in another csv file.

Working with the mount point, we'll proceed as follows:

  1. Let's list the CSV files we are trying to read from the mount point:
    display(dbutils.fs.ls("/mnt/Gen2/Customer/csvFiles/"))
  2. We will read the csv files directly from the mount point without specifying any schema options:
    df_cust= spark.read.format("csv").option("header",True).load("/mnt/Gen2/Customer/csvFiles/")
  3. When you run df_cust.printSchema(), you will find that the datatypes for all columns are strings.
  4. Next, we will run the same code as in the preceding step, but this time asking Spark to infer the schema from csv files by using option("header","true"):
    df_cust= spark.read.format("csv").option("header",True).option("inferSchema", True).load("/mnt/Gen2/Customer/csvFiles/")
  5. Run df_cust.printSchema(), and you will find the datatype has changed for a few columns such as CustKey, where the datatype now being shown is an integer instead of a string.
  6. We will now create a schema and explicitly provide it while reading the csv files using a DataFrame:
    cust_schema = StructType([
        StructField("C_CUSTKEY", IntegerType()),
        StructField("C_NAME", StringType()),
        StructField("C_ADDRESS", StringType()),
        StructField("C_NATIONKEY", ShortType()),
        StructField("C_PHONE", StringType()),
        StructField("C_ACCTBAL", DoubleType()),
        StructField("C_MKTSEGMENT", StringType()),
        StructField("C_COMMENT", StringType())
    ])
  7. Create a DataFrame by using the schema created in the preceding step:
    df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("/mnt/Gen2/Customer/csvFiles/")
  8. In the following step, we will be performing basic aggregation on the DataFrame:
    df_cust_agg = df_cust.groupBy("C_MKTSEGMENT") .agg(sum("C_ACCTBAL").cast('decimal(20,3)').alias("sum_acctbal"), avg("C_ACCTBAL").alias("avg_acctbal"), max("C_ACCTBAL").alias("max_bonus")).orderBy("avg_acctbal",ascending=False)
  9. We will write the DataFrame we created in the preceding step to the mount point and save it in CSV format:
    df_cust_agg.write.mode("overwrite").option("header", "true").csv("/mnt/Gen-2/CustMarketSegmentAgg/"))
  10. To list the CSV file created, run the following code:
    (dbutils.fs.ls("/mnt/Gen-2/CustMarketSegmentAgg/"))

We'll now work with an ADLS Gen2 storage account without mounting it to DBFS:

  1. You can access an ADLS Gen2 storage account directly without mounting to DBFS using OAuth 2.0 and a service principal. You can access any ADLS Gen2 storage account that the service principal has permissions on. We need to set the credentials first in our notebook before we can directly access the file system. clientID and clientSecret are the variables defined in the notebook:
    spark.conf.set("fs.azure.account.auth.type.cookbookadlsgen2storage.dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type.cookobookadlsgen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id.cookbookadlsgen2storage.dfs.core.windows.net", clientID)
    spark.conf.set("fs.azure.account.oauth2.client.secret.cookbookadlsgen2storage.dfs.core.windows.net", clientSecret)
    spark.conf.set("fs.azure.account.oauth2.client.endpoint.cookbookadlsgen2storage.dfs.core.windows.net", oauth2Endpoint)
  2. After the preceding step is executed, you can directly read the csv files from the ADLS Gen2 storage account without mounting it:
    df_direct = spark.read.format("csv").option("header",True).schema(cust_schema).load("abfss://rawdata@cookbookadlsgen2storage.dfs.core.windows.net/Customer/csvFiles")
  3. You can view a few records of the DataFrame by executing the following code:
    display(df_direct.limit(10))
  4. We will now write a DataFrame in Parquet format in the ADLS Gen2 storage account directly, without using the mount point, by executing the following code. We are repartitioning the DataFrame to ensure we are creating 10 Parquet files:
    parquetCustomerDestDirect = "abfss://rawdata@cookbookadlsgen2storage.dfs.core.windows.net/Customer/parquetFiles"
    df_direct_repart=df_direct.repartition(10)
    df_direct_repart.write.mode("overwrite").option("header", "true").parquet(parquetCustomerDestDirect)
  5. You can create a DataFrame on the Parquet files created in the preceding step to ensure we are able to read the data:
    df_parquet = spark.read.format("parquet").option("header",True).schema(cust_schema).load("abfss://rawdata@cookbookadlsgen2storage.dfs.core.windows.net/Customer/parquetFiles")
  6. You can view the Parquet files created in the preceding step by executing the following code:
    display(dbutils.fs.ls(parquetCustomerDestDirect))

How it works…

The following code is set to directly access the ADL Gen2 storage account without mounting to DBFS. These settings are applicable when we are using DataFrame or dataset APIs:

spark.conf.set("fs.azure.account.auth.type.cookbookadlsgen2storage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.cookbookadlsgen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.cookbookadlsgen2storage.dfs.core.windows.net", clientID)
spark.conf.set("fs.azure.account.oauth2.client.secret.cookbookadlsgen2storage.dfs.core.windows.net", clientSecret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.cookbookadlsgen2storage.dfs.core.windows.net", oauth2Endpoint)

You should set the preceding values in your notebook session if you want the users to directly access the ADLS Gen2 storage account without mounting to DBFS. This method is useful when you are doing some ad hoc analysis and don't want users to create multiple mount points when you are trying to access data from various ADLS Gen2 storage accounts.

You have been reading a chapter from
Azure Databricks Cookbook
Published in: Sep 2021
Publisher: Packt
ISBN-13: 9781789809718
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at €18.99/month. Cancel anytime