Let's look at some data properties (use the EDA.scala file for this). At first, we need to read the training set to see the available properties. To begin with, let's place your training set in your project directory or somewhere else and point to it accordingly:
val train = "data/insurance_train.csv"
I hope you have Java, Scala and Spark installed and configured on your machine. If not, please do so. Anyway, I'm assuming they are. So let's create an active Spark session, which is the gateway for any Spark application:
val spark = SparkSessionCreate.createSession()
import spark.implicits._
Spark session alias on Scala REPL:
If you are inside Scala REPL, the Spark session alias spark is already defined, so just get going.
Here, I have a method called createSession() under the class SparkSessionCreate that goes as follows:
import org.apache.spark.sql.SparkSession
object SparkSessionCreate {
def createSession(): SparkSession = {
val spark = SparkSession
.builder
.master("local[*]") // adjust accordingly
.config("spark.sql.warehouse.dir", "E:/Exp/") //change accordingly
.appName("MySparkSession") //change accordingly
.getOrCreate()
return spark
}
}
Since this will be used frequently throughout this book, I decided to create a dedicated method. So, let's load, parse, and create a DataFrame using the read.csv method but in Databricks .csv format (as known as com.databricks.spark.csv) since our dataset comes with .csv format.
At this point, I have to interrupt you to inform something very useful. Since we will be using Spark MLlib and ML APIs in upcoming chapters too. Therefore, it would be worth fixing some issues in prior. If you're a Windows user then let me tell you a very weired issue that you will be experiencing while working with Spark.
Well, the thing is that Spark works on Windows, Mac OS, and Linux. While using Eclipse or IntelliJ IDEA to develop your Spark applications (or through Spark local job sumit) on Windows, you might face an I/O exception error and consequently your application might not compile successfully or may be interrupted.
The reason is that Spark expects that there is a runtime environment for Hadoop on Windows. Unfortunately, the binary distribution of Spark (v2.2.0 for example) release does not contain some Windows native components (example, winutils.exe, hadoop.dll, and so on). However, these are required (not optional) to run Hadoop on Windows. Therefore, if you cannot ensure the runtime environment, an I/O exception saying the following:
24/01/2018 11:11:10
ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
Now there are two ways to tackale this issue on Windows:
- From IDE such as Eclipse and IntelliJ IDEA: Download the winutls.exe from https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin/. Then download and copy it inside the bin folder in the Spark distribution—example, spark-2.2.0-bin-hadoop2.7/bin/ . Then select the project | Run Configurations... | Environment | New | create a variable named HADOOP_HOME and put the path in the value field—example, c:/spark-2.2.0-bin-hadoop2.7/bin/ | OK | Apply | Run. Then you're done!
- With local Spark job submit: Add the winutils.exe file path to the hadoop home directory using System set properties—example, in the Spark code System.setProperty("hadoop.home.dir", "c:\\\spark-2.2.0-bin-hadoop2.7\\\bin\winutils.exe")
Alright, let's come to your original discussion. If you see the preceding code block then we set to read the header of the CSV file, which is directly applied to the column names of the DataFrame created, and the inferSchema property is set to true. If you don't specify the inferSchema configuration explicitly, the float values will be treated as strings. This might cause VectorAssembler to raise an exception such as java.lang.IllegalArgumentException: Data type StringType is not supported:
val trainInput = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("com.databricks.spark.csv")
.load(train)
.cache
Now let's print the schema of the DataFrame we just created. I have abridged the output and shown only a few columns:
Println(trainInput.printSchema())
root
|-- id: integer (nullable = true)
|-- cat1: string (nullable = true)
|-- cat2: string (nullable = true)
|-- cat3: string (nullable = true)
...
|-- cat115: string (nullable = true)
|-- cat116: string (nullable = true)
...
|-- cont14: double (nullable = true)
|-- loss: double (nullable = true)
You can see that there are 116 categorical columns for categorical features. Also, there are 14 numerical feature columns. Now let's see how many rows there are in the dataset using the count() method:
println(df.count())
>>>
188318
The preceding number is pretty high for training an ML model. Alright, now let's see a snapshot of the dataset using the show() method but with only some selected columns so that it makes more sense. Feel free to use df.show() to see all columns:
df.select("id", "cat1", "cat2", "cat3", "cont1", "cont2", "cont3", "loss").show()
>>>
Nevertheless, if you look at all the rows using df.show(), you will see some categorical columns containing too many categories. To be more specific, category columns cat109 to cat116 contain too many categories, as follows:
df.select("cat109", "cat110", "cat112", "cat113", "cat116").show()
>>>
In later stages, it would be worth dropping these columns to remove the skewness in the dataset. It is to be noted that in statistics, skewness is a measure of the asymmetry of the probability distribution of a real-valued random variable with respect to the mean.
Now that we have seen a snapshot of the dataset, it is worth seeing some other statistics such as average claim or loss, minimum, maximum loss, and many more, using Spark SQL. But before that, let's rename the last column from loss to label since the ML model will complain about it. Even after using the setLabelCol on the regression model, it still looks for a column called label. This results in a disgusting error saying org.apache.spark.sql.AnalysisException: cannot resolve 'label' given input columns:
val newDF = df.withColumnRenamed("loss", "label")
Now, since we want to execute an SQL query, we need to create a temporary view so that the operation can be performed in-memory:
newDF.createOrReplaceTempView("insurance")
Now let's average the damage claimed by the clients:
spark.sql("SELECT avg(insurance.label) as AVG_LOSS FROM insurance").show()
>>>
+------------------+
| AVG_LOSS |
+------------------+
|3037.3376856699924|
+------------------+
Similarly, let's see the lowest claim made so far:
spark.sql("SELECT min(insurance.label) as MIN_LOSS FROM insurance").show()
>>>
+--------+
|MIN_LOSS|
+--------+
| 0.67|
+--------+
And let's see the highest claim made so far:
spark.sql("SELECT max(insurance.label) as MAX_LOSS FROM insurance").show()
>>>
+---------+
| MAX_LOSS|
+---------+
|121012.25|
+---------+
Since Scala or Java does not come with a handy visualization library, I could not something else but now let's focus on the data preprocessing before we prepare our training set.