Chapter 04: Diving Deeper with Spark
Activity 9: Getting Started with Spark DataFrames
If you are using Google Collab to run the Jupyter notebook, add these lines to ensure you have set the environment:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q http://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz !tar xf spark-2.4.0-bin-hadoop2.7.tgz !pip install -q findspark import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-2.4.2-bin-hadoop2.7"
Install findspark if not installed using the following command:
pip install -q findspark
To create a sample DataFrame by manually specifying the schema, importing findspark module to connect Jupyter with Spark:
import findspark findspark.init() import pyspark import os
Create the SparkContext and SQLContext using the following command:
sc = pyspark.SparkContext() from pyspark.sql import SQLContext sqlc = SQLContext(sc) from pyspark.sql import * na_schema = Row("Name","Subject","Marks") row1 = na_schema("Ankit", "Science",95) row2 = na_schema("Ankit", "Maths", 86) row3 = na_schema("Preity", "Maths", 92) na_list = [row1, row2, row3] df_na = sqlc.createDataFrame(na_list) type(df_na)
The output is as follows:
pyspark.sql.dataframe.DataFrame
Check the DataFrame using the following command:
df_na.show()
The output is as follows:
Create a sample DataFrame from an existing RDD. First creating RDD as illustrated here:
data = [("Ankit","Science",95),("Preity","Maths",86),("Ankit","Maths",86)] data_rdd = sc.parallelize(data) type(data_rdd)
The output is as follows:
pyspark.rdd.RDD
Converting RDD to DataFrame using the following command:
data_df = sqlc.createDataFrame(data_rdd) data_df.show()
The output is as follows:
Create a sample DataFrame by reading the data from a CSV file:
df = sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('mtcars.csv') type(df)
The output is as follows:
pyspark.sql.dataframe.DataFrame
Print first seven rows of the DataFrame:
df.show(7)
The output is as follows:
Print the schema of the DataFrame:
df.printSchema()
The output is as follows:
Print the number of columns and rows in DataFrame:
print('number of rows:'+ str(df.count())) print('number of columns:'+ str(len(df.columns)))
The output is as follows:
number of rows:32 number of columns:11
Print the summary statistics of DataFrame and any two individual columns:
df.describe().show()
The output is as follows:
Print the summary of any two columns:
df.describe(['mpg','cyl']).show()
The output is as follows:
Write first seen rows of the sample DataFrame in a CSV file:
df_p = df.toPandas() df_p.head(7).to_csv("mtcars_head.csv")
Activity 10: Data Manipulation with Spark DataFrames
Install the packages as illustrated here:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null !wget -q http://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz !tar xf spark-2.4.0-bin-hadoop2.7.tgz !pip install -q findspark import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
Then, import the findspark module to connect the Jupyter with Spark use the following command:
import findspark findspark.init() import pyspark import os
Now, create the SparkContext and SQLContext as illustrated here:
sc = pyspark.SparkContext() from pyspark.sql import SQLContext sqlc = SQLContext(sc)
Create a DataFrame in Spark as illustrated here:
df = sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('mtcars.csv') df.show(4)
The output is as follows:
Rename any five columns of DataFrame using the following command:
data = df new_names = ['mpg_new', 'cyl_new', 'disp_new', 'hp_new', 'drat_new'] for i,z in zip(data.columns[0:5],new_names): data = data.withColumnRenamed(str(i),str(z)) data.columns
The output is as follows:
Select any two numeric and one categorical column from the DataFrame:
data = df.select(['cyl','mpg','hp']) data.show(5)
The output is as follows:
Count the number of distinct categories in the categorical variable:
data.select('cyl').distinct().count() #3
Create two new columns in DataFrame by summing up and multiplying together the two numerical columns:
data = data.withColumn('colsum',(df['mpg'] + df['hp'])) data = data.withColumn('colproduct',(df['mpg'] * df['hp'])) data.show(5)
The output is as follows:
Drop both the original numerical columns:
data = data.drop('mpg','hp') data.show(5)
Sort the data by categorical column:
data = data.orderBy(data.cyl) data.show(5)
The output is as follows:
Calculate the mean of the summation column for each distinct category in the categorical variable:
data.groupby('cyl').agg({'colsum':'mean'}).show()
The output is as follows:
Filter the rows with values greater than the mean of all the mean values calculated in the previous step:
data.count()#15 cyl_avg = data.groupby('cyl').agg({'colsum':'mean'}) avg = cyl_avg.agg({'avg(colsum)':'mean'}).toPandas().iloc[0,0] data = data.filter(data.colsum > avg) data.count() data.show(5)
The output is as follows:
De-duplicate the resultant DataFrame to make sure it has all unique records:
data = data.dropDuplicates() data.count()
The output is 15.
Activity 11: Graphs in Spark
Import the required Python libraries in the Jupyter Notebook:
import pandas as pd import os import matplotlib.pyplot as plt import seaborn as sns %matplotlib inline
Read and show the data from the CSV file using the following command:
df = pd.read_csv('mtcars.csv') df.head()
The output is as follows:
Visualize the discrete frequency distribution of any continuous numeric variable from your dataset using a histogram:
plt.hist(df['mpg'], bins=20) plt.ylabel('Frequency') plt.xlabel('Values') plt.title('Frequency distribution of mpg') plt.show()
The output is as follows:
Visualize the percentage share of the categories in the dataset using a pie chart:
## Calculate count of records for each gear data = pd.DataFrame([[3,4,5],df['gear'].value_counts().tolist()]).T data.columns = ['gear','gear_counts'] ## Visualising percentage contribution of each gear in data using pie chart plt.pie(data.gear_counts, labels=data.gear, startangle=90, autopct='%.1f%%') plt.title('Percentage contribution of each gear') plt.show()
The output is as follows:
Plot the distribution of a continuous variable across the categories of a categorical variable using a boxplot:
sns.boxplot(x = 'gear', y = 'disp', data = df) plt.show()
The output is as follows:
Visualize the values of a continuous numeric variable using a line chart:
data = df[['hp']] data.plot(linestyle='-') plt.title('Line Chart for hp') plt.ylabel('Values') plt.xlabel('Row number') plt.show()
The output is as follows:
Plot the values of multiple continuous numeric variables on the same line chart:
data = df[['hp','disp', 'mpg']] data.plot(linestyle='-') plt.title('Line Chart for hp, disp & mpg') plt.ylabel('Values') plt.xlabel('Row number') plt.show()
The output is as follows: