Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds
Arrow up icon
GO TO TOP
Data Engineering with Databricks Cookbook

You're reading from   Data Engineering with Databricks Cookbook Build effective data and AI solutions using Apache Spark, Databricks, and Delta Lake

Arrow left icon
Product type Paperback
Published in May 2024
Publisher Packt
ISBN-13 9781837633357
Length 438 pages
Edition 1st Edition
Arrow right icon
Author (1):
Arrow left icon
Pulkit Chadha Pulkit Chadha
Author Profile Icon Pulkit Chadha
Pulkit Chadha
Arrow right icon
View More author details
Toc

Table of Contents (16) Chapters Close

Preface 1. Part 1 – Working with Apache Spark and Delta Lake FREE CHAPTER
2. Chapter 1: Data Ingestion and Data Extraction with Apache Spark 3. Chapter 2: Data Transformation and Data Manipulation with Apache Spark 4. Chapter 3: Data Management with Delta Lake 5. Chapter 4: Ingesting Streaming Data 6. Chapter 5: Processing Streaming Data 7. Chapter 6: Performance Tuning with Apache Spark 8. Chapter 7: Performance Tuning in Delta Lake 9. Part 2 – Data Engineering Capabilities within Databricks
10. Chapter 8: Orchestration and Scheduling Data Pipeline with Databricks Workflows 11. Chapter 9: Building Data Pipelines with Delta Live Tables 12. Chapter 10: Data Governance with Unity Catalog 13. Chapter 11: Implementing DataOps and DevOps on Databricks 14. Index 15. Other Books You May Enjoy

Reading CSV data with Apache Spark

Reading CSV data is a common task in data engineering and analysis, and Apache Spark provides a powerful and efficient way to process such data. Apache Spark supports various file formats, including CSV, and it provides many options for reading and processing such data. In this recipe, we will learn how to read CSV data with Apache Spark using Python.

How to do it...

  1. Import libraries: Import the required libraries and create a SparkSession object:
    from pyspark.sql import SparkSession
    spark = (SparkSession.builder
        .appName("read-csv-data")
        .master("spark://spark-master:7077")
        .config("spark.executor.memory", "512m")
        .getOrCreate())
    spark.sparkContext.setLogLevel("ERROR")
  2. Read the CSV data with an inferred schema: Read the CSV file using the read method of SparkSession. In the following code, we specify the format of the file as csv using the format method of SparkSession. We then set the header option to true to indicate that the first row of the CSV file contains the column names. Finally, we specify the path to the CSV file to load it into a DataFrame using the load method:
    df = (spark.read.format("csv")
        .option("header", "true")
        .load("../data/netflix_titles.csv"))

Note

If your CSV file does not have a header row, you can set the header option to false, as shown:

df = (spark.read.format("csv")

.option("header", "false")

.load("../data/netflix_titles.csv"))

  1. Display sample data in the DataFrame: You can display the contents of the DataFrame using the show() method. This will display the first 20 rows of the DataFrame. If you want to display more or fewer rows, you can pass an integer argument to the show() method:
    # Display contents of DataFrame
    df.show()
    # Alternatively
    # df.show(10, truncate=False)
  2. Read the CSV data with an explicit schema: First, you need to define the schema for your CSV file. You can do this using the StructType and StructField classes in Spark SQL. For example, if your CSV file has three columns, "name", "age", and "gender", and you want to specify that "name" is a string, "age" is an integer, and "gender" is a string, you could define the schema like this:
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
    schema = StructType([
        StructField("show_id", StringType(), True),
        StructField("type", StringType(), True),
        StructField("title", StringType(), True),
        StructField("director", StringType(), True),
        StructField("cast", StringType(), True),
        StructField("country", StringType(), True),
        StructField("date_added", DateType(), True),
        StructField("release_year", IntegerType(), True),
        StructField("rating", StringType(), True),
        StructField("duration", StringType(), True),
        StructField("listed_in", StringType(), True),
        StructField("description", StringType(), True)])
  3. Read the CSV file: Next, you need to read the CSV file using the spark.read.format("csv") method and pass it in the schema as an argument. In this example, we’re reading a CSV file located at ../data/netflix_titles.csv, with the first row as the header, and using the previously defined schema:
    df = (spark.read.format("csv")
        .option("header", "true")
        .schema(schema)
        .load("../data/netflix_titles.csv"))
  4. You can display the contents of the DataFrame using the show() method. This will display the first five rows of the DataFrame. If you want to display more or fewer rows, you can change the n argument to the show() method:
    df.show()
  5. Finally, we need to stop the Spark session to release the resources used by Spark:
    spark.stop()

Common issues faced while working with CSV data

Let’s take a look at some issues frequently encountered when working with CSV data:

  • Issue: Delimiter value is present within the data.

    Solution: You can use the option("escapeQuotes", "true") method while reading the CSV file to specify how to handle the delimiter value within the column values. In the following code, we specified that there are escape quotes for values that have the delimiter character:

    df = (spark.read.format("csv")
        .option("header", "true")
        .option("nullValue", "null")
        .option("escapeQuotes", "true")
        .schema(schema)
        .load("../data/netflix_titles.csv"))
  • Issue: Null values and empty values are not handled correctly.

    Solution: You can use the option("nullValue", "null") method while reading the CSV file to specify how null values are represented in the CSV file. You can also use the option("emptyValue", "") method to specify how to handle empty values. In the following code, we specified the "null" value as the representation of null values and empty values to be "" in the CSV file. You can specify any other string or character that represents null values in your CSV file:

    df = (spark.read.format("csv")
        .option("header", "true")
        .option("nullValue", "null")
        .option("emptyValues", "")
        .schema(schema)
        .load("../data/netflix_titles.csv"))
  • Issue: Date formats are different and not handled correctly.

    Solution: You can use the option("dateFormat", "LLLL d, y") method while reading the CSV file to specify how date column values are represented in the CSV file. In the following code, we specified that date column values are formatted as "LLLL d, y". You can specify any other datetime patterns for formatting and parsing dates in a CSV file:

    df = (spark.read.format("csv")
        .option("header", "true")
        .option("nullValue", "null")
        .option("dateFormat", "LLLL d, y")
        .schema(schema)
        .load("../data/netflix_titles.csv"))

Note

When we used the read API in step 2, Apache Spark did not execute any jobs. This is because Apache Spark uses a lazy evaluation technique to delay the execution of transformations until an action is called. This allows Spark to optimize the execution plan and recover from failures. However, it can also lead to some challenges in debugging and troubleshooting. To work effectively with lazy evaluation, it’s important to understand the distinction between transformations and actions and to consider the order and timing of transformations and actions in your code.

There’s more…

Here are some additional details to help you become more knowledgeable about reading CSV data with Apache Spark:

  • Specifying options when reading CSV files: In addition to specifying the header option, you can also specify other options when reading CSV files with Spark. For example, you can use the delimiter option to specify the delimiter used in the CSV file (for example, option("delimiter", "|") for a pipe-delimited file) or the inferSchema option to automatically infer the data types of the columns in the DataFrame (for example, option ("inferSchema", "true")).
  • Handling missing or malformed data: When reading CSV files with Spark, you may encounter missing or malformed data that can cause errors. To handle missing data, you can use the nullValue option to specify the value used to represent null values in the CSV file (for example, option("nullValue", "NA") for a file where "NA" represents null values). To handle malformed data, you can use the mode option to specify how to handle parsing errors (for example, option("mode", "PERMISSIVE") to ignore parsing errors and continue processing the file).
  • Working with large CSV files: When working with large CSV files, you may run into memory and performance issues if you try to load the entire file into a DataFrame at once. To avoid this, you can use the spark.read.csv() method with the maxColumns and maxCharsPerColumn options to limit the number of columns and characters per column that Spark reads at a time. You can also use the spark.readStream.csv() method to read large CSV files as a stream, which allows you to process the data in real time as it is read from disk.

See also

lock icon The rest of the chapter is locked
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at €18.99/month. Cancel anytime