Reading CSV data with Apache Spark
Reading CSV data is a common task in data engineering and analysis, and Apache Spark provides a powerful and efficient way to process such data. Apache Spark supports various file formats, including CSV, and it provides many options for reading and processing such data. In this recipe, we will learn how to read CSV data with Apache Spark using Python.
How to do it...
- Import libraries: Import the required libraries and create a
SparkSession
object:from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("read-csv-data")
.master("spark://spark-master:7077")
.config("spark.executor.memory", "512m")
.getOrCreate())
spark.sparkContext.setLogLevel("ERROR")
- Read the CSV data with an inferred schema: Read the CSV file using the
read
method ofSparkSession
. In the following code, we specify the format of the file ascsv
using theformat
method ofSparkSession
. We then set theheader
option totrue
to indicate that the first row of the CSV file contains the column names. Finally, we specify the path to the CSV file to load it into a DataFrame using theload
method:df = (spark.read.format("csv")
.option("header", "true")
.load("../data/netflix_titles.csv"))
Note
If your CSV file does not have a header row, you can set the header
option to false
, as shown:
df = (
spark.read.format("csv")
.
option("header", "false")
.
load("../data/netflix_titles.csv"))
- Display sample data in the DataFrame: You can display the contents of the DataFrame using the
show()
method. This will display the first 20 rows of the DataFrame. If you want to display more or fewer rows, you can pass an integer argument to theshow()
method:# Display contents of DataFrame
df.show()
# Alternatively
# df.show(10, truncate=False)
- Read the CSV data with an explicit schema: First, you need to define the schema for your CSV file. You can do this using the
StructType
andStructField
classes in Spark SQL. For example, if your CSV file has three columns,"name"
,"age"
, and"gender"
, and you want to specify that"name"
is a string,"age"
is an integer, and"gender"
is a string, you could define the schema like this:from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
schema = StructType([
StructField("show_id", StringType(), True),
StructField("type", StringType(), True),
StructField("title", StringType(), True),
StructField("director", StringType(), True),
StructField("cast", StringType(), True),
StructField("country", StringType(), True),
StructField("date_added", DateType(), True),
StructField("release_year", IntegerType(), True),
StructField("rating", StringType(), True),
StructField("duration", StringType(), True),
StructField("listed_in", StringType(), True),
StructField("description", StringType(), True)])
- Read the CSV file: Next, you need to read the CSV file using the
spark.read.format("csv")
method and pass it in the schema as an argument. In this example, we’re reading a CSV file located at../data/netflix_titles.csv
, with the first row as the header, and using the previously defined schema:df = (spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load("../data/netflix_titles.csv"))
- You can display the contents of the DataFrame using the
show()
method. This will display the first five rows of the DataFrame. If you want to display more or fewer rows, you can change then
argument to theshow()
method:df.show()
- Finally, we need to stop the Spark session to release the resources used by Spark:
spark.stop()
Common issues faced while working with CSV data
Let’s take a look at some issues frequently encountered when working with CSV data:
- Issue: Delimiter value is present within the data.
Solution: You can use the
option("escapeQuotes", "true")
method while reading the CSV file to specify how to handle the delimiter value within the column values. In the following code, we specified that there are escape quotes for values that have the delimiter character:df = (spark.read.format("csv")
.option("header", "true")
.option("nullValue", "null")
.option("escapeQuotes", "true")
.schema(schema)
.load("../data/netflix_titles.csv"))
- Issue: Null values and empty values are not handled correctly.
Solution: You can use the
option("nullValue", "null")
method while reading the CSV file to specify how null values are represented in the CSV file. You can also use theoption("emptyValue", "")
method to specify how to handle empty values. In the following code, we specified the"null"
value as the representation of null values and empty values to be""
in the CSV file. You can specify any other string or character that represents null values in your CSV file:df = (spark.read.format("csv")
.option("header", "true")
.option("nullValue", "null")
.option("emptyValues", "")
.schema(schema)
.load("../data/netflix_titles.csv"))
- Issue: Date formats are different and not handled correctly.
Solution: You can use the
option("dateFormat", "LLLL d, y")
method while reading the CSV file to specify howdate
column values are represented in the CSV file. In the following code, we specified thatdate
column values are formatted as"LLLL d, y"
. You can specify any other datetime patterns for formatting and parsing dates in a CSV file:df = (spark.read.format("csv")
.option("header", "true")
.option("nullValue", "null")
.option("dateFormat", "LLLL d, y")
.schema(schema)
.load("../data/netflix_titles.csv"))
Note
When we used the read
API in step 2, Apache Spark did not execute any jobs. This is because Apache Spark uses a lazy evaluation technique to delay the execution of transformations until an action is called. This allows Spark to optimize the execution plan and recover from failures. However, it can also lead to some challenges in debugging and troubleshooting. To work effectively with lazy evaluation, it’s important to understand the distinction between transformations and actions and to consider the order and timing of transformations and actions in your code.
There’s more…
Here are some additional details to help you become more knowledgeable about reading CSV data with Apache Spark:
- Specifying options when reading CSV files: In addition to specifying the
header
option, you can also specify other options when reading CSV files with Spark. For example, you can use thedelimiter
option to specify the delimiter used in the CSV file (for example,option("delimiter", "|")
for a pipe-delimited file) or theinferSchema
option to automatically infer the data types of the columns in the DataFrame (for example,option ("
inferSchema", "true")
). - Handling missing or malformed data: When reading CSV files with Spark, you may encounter missing or malformed data that can cause errors. To handle missing data, you can use the
nullValue
option to specify the value used to represent null values in the CSV file (for example,option("nullValue", "NA")
for a file where"NA"
represents null values). To handle malformed data, you can use themode
option to specify how to handle parsing errors (for example,option("mode", "PERMISSIVE")
to ignore parsing errors and continue processing the file). - Working with large CSV files: When working with large CSV files, you may run into memory and performance issues if you try to load the entire file into a DataFrame at once. To avoid this, you can use the
spark.read.csv()
method with themaxColumns
andmaxCharsPerColumn
options to limit the number of columns and characters per column that Spark reads at a time. You can also use thespark.readStream.csv()
method to read large CSV files as a stream, which allows you to process the data in real time as it is read from disk.
See also
- CSV Files – Spark 3.4.0 documentation: https://spark.apache.org/docs/latest/sql-data-sources-csv.html
- Data Types – Spark 3.4.0 documentation: https://spark.apache.org/docs/latest/sql-ref-datatypes.html
- Datetime Patterns for Formatting and Parsing – Spark 3.4.0 documentation: https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html