Reading and writing data from and to ADLS Gen2
In this recipe, you will learn how to read and write data to ADLS Gen2 from Databricks. We can do this by following these two methods:
- Mounting storage: Covered in the Mounting ADLS Gen2 and Azure Blob storage to Azure DBFS recipe of this chapter.
- Directly accessing the ADLS Gen2 storage using a SAS token and a service principal: In this scenario, we will not mount the storage, but we will directly access the storage endpoint to read and write files using storage keys, service principals, and OAuth 2.0.
ADLS Gen2 provides file system semantics, which provides security to folders and files, and the hierarchical directory structure provides efficient access to the data in the storage.
By the end of this recipe, you will know multiple ways to read/write files from and to an ADLS Gen2 account.
Getting ready
You will need to ensure you have the following items before starting to work on this recipe:
- An ADLS Gen2 account, mounted by following the steps in the first recipe of this chapter, Mounting ADLS Gen2 and Azure Blob to Azure Databricks File System.
- Storage keys—you can get these by following the steps mentioned in the first recipe of this chapter, Mounting ADLS Gen2 and Azure Blob to Azure Databricks File System.
You can follow along by running the steps in the 2-3.Reading and Writing Data from and to ADLS Gen-2.ipynb
notebook in your local cloned repository in the Chapter02
folder.
Upload the csvFiles
folder in the Chapter02/Customer
folder to the ADLS Gen2 account in the rawdata
file system.
Note
We have tested the steps mentioned in this recipe on Azure Databricks Runtime version 6.4 which includes Spark 2.4.5 and on Runtime version 7.3 LTS which includes Spark 3.0.1
How to do it…
We will learn how to read CSV files from the mount point and the ADLS Gen2 storage directly. We will perform basic aggregation on the DataFrame, such as counting and storing the result in another csv
file.
Working with the mount point, we'll proceed as follows:
- Let's list the CSV files we are trying to read from the mount point:
display(dbutils.fs.ls("/mnt/Gen2/Customer/csvFiles/"))
- We will read the
csv
files directly from the mount point without specifying any schema options:df_cust= spark.read.format("csv").option("header",True).load("/mnt/Gen2/Customer/csvFiles/")
- When you run
df_cust.printSchema()
, you will find that the datatypes for all columns are strings. - Next, we will run the same code as in the preceding step, but this time asking Spark to infer the schema from
csv
files by usingoption("header","true")
:df_cust= spark.read.format("csv").option("header",True).option("inferSchema", True).load("/mnt/Gen2/Customer/csvFiles/")
- Run
df_cust.printSchema()
, and you will find the datatype has changed for a few columns such asCustKey
, where the datatype now being shown is an integer instead of a string. - We will now create a schema and explicitly provide it while reading the
csv
files using a DataFrame:cust_schema = StructType([ Â Â Â Â StructField("C_CUSTKEY", IntegerType()), Â Â Â Â StructField("C_NAME", StringType()), Â Â Â Â StructField("C_ADDRESS", StringType()), Â Â Â Â StructField("C_NATIONKEY", ShortType()), Â Â Â Â StructField("C_PHONE", StringType()), Â Â Â Â StructField("C_ACCTBAL", DoubleType()), Â Â Â Â StructField("C_MKTSEGMENT", StringType()), Â Â Â Â StructField("C_COMMENT", StringType()) ])
- Create a DataFrame by using the schema created in the preceding step:
df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("/mnt/Gen2/Customer/csvFiles/")
- In the following step, we will be performing basic aggregation on the DataFrame:
df_cust_agg = df_cust.groupBy("C_MKTSEGMENT") .agg(sum("C_ACCTBAL").cast('decimal(20,3)').alias("sum_acctbal"), avg("C_ACCTBAL").alias("avg_acctbal"), max("C_ACCTBAL").alias("max_bonus")).orderBy("avg_acctbal",ascending=False)
- We will write the DataFrame we created in the preceding step to the mount point and save it in CSV format:
df_cust_agg.write.mode("overwrite").option("header", "true").csv("/mnt/Gen-2/CustMarketSegmentAgg/"))
- To list the CSV file created, run the following code:
(dbutils.fs.ls("/mnt/Gen-2/CustMarketSegmentAgg/"))
We'll now work with an ADLS Gen2 storage account without mounting it to DBFS:
- You can access an ADLS Gen2 storage account directly without mounting to DBFS using OAuth 2.0 and a service principal. You can access any ADLS Gen2 storage account that the service principal has permissions on. We need to set the credentials first in our notebook before we can directly access the file system.
clientID
andclientSecret
are the variables defined in the notebook:spark.conf.set("fs.azure.account.auth.type.cookbookadlsgen2storage.dfs.core.windows.net", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type.cookobookadlsgen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id.cookbookadlsgen2storage.dfs.core.windows.net", clientID) spark.conf.set("fs.azure.account.oauth2.client.secret.cookbookadlsgen2storage.dfs.core.windows.net", clientSecret) spark.conf.set("fs.azure.account.oauth2.client.endpoint.cookbookadlsgen2storage.dfs.core.windows.net", oauth2Endpoint)
- After the preceding step is executed, you can directly read the
csv
files from the ADLS Gen2 storage account without mounting it:df_direct = spark.read.format("csv").option("header",True).schema(cust_schema).load("abfss://rawdata@cookbookadlsgen2storage.dfs.core.windows.net/Customer/csvFiles")
- You can view a few records of the DataFrame by executing the following code:
display(df_direct.limit(10))
- We will now write a DataFrame in Parquet format in the ADLS Gen2 storage account directly, without using the mount point, by executing the following code. We are repartitioning the DataFrame to ensure we are creating
10
Parquet files:parquetCustomerDestDirect = "abfss://rawdata@cookbookadlsgen2storage.dfs.core.windows.net/Customer/parquetFiles" df_direct_repart=df_direct.repartition(10) df_direct_repart.write.mode("overwrite").option("header", "true").parquet(parquetCustomerDestDirect)
- You can create a DataFrame on the Parquet files created in the preceding step to ensure we are able to read the data:
df_parquet = spark.read.format("parquet").option("header",True).schema(cust_schema).load("abfss://rawdata@cookbookadlsgen2storage.dfs.core.windows.net/Customer/parquetFiles")
- You can view the Parquet files created in the preceding step by executing the following code:
display(dbutils.fs.ls(parquetCustomerDestDirect))
How it works…
The following code is set to directly access the ADL Gen2 storage account without mounting to DBFS. These settings are applicable when we are using DataFrame or dataset APIs:
spark.conf.set("fs.azure.account.auth.type.cookbookadlsgen2storage.dfs.core.windows.net", "OAuth") spark.conf.set("fs.azure.account.oauth.provider.type.cookbookadlsgen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") spark.conf.set("fs.azure.account.oauth2.client.id.cookbookadlsgen2storage.dfs.core.windows.net", clientID) spark.conf.set("fs.azure.account.oauth2.client.secret.cookbookadlsgen2storage.dfs.core.windows.net", clientSecret) spark.conf.set("fs.azure.account.oauth2.client.endpoint.cookbookadlsgen2storage.dfs.core.windows.net", oauth2Endpoint)
You should set the preceding values in your notebook session if you want the users to directly access the ADLS Gen2 storage account without mounting to DBFS. This method is useful when you are doing some ad hoc analysis and don't want users to create multiple mount points when you are trying to access data from various ADLS Gen2 storage accounts.