In this section, we'll see a real-life example of a classification problem. The idea is to develop a classifier that, given the values for sex, age, time, number of warts, type, and area, will predict whether a patient has to go through the cryotherapy.
Getting started learning
Description of the dataset
We will use a recently added cryotherapy dataset from the UCI machine learning repository. The dataset can be downloaded from http://archive.ics.uci.edu/ml/datasets/Cryotherapy+Dataset+#.
This dataset contains information about wart treatment results of 90 patients using cryotherapy. In case you don't know, a wart is a kind of skin problem caused by infection with a type of human papillomavirus. Warts are typically small, rough, and hard growths that are similar in color to the rest of the skin.
There are two available treatments for this problem:
- Salicylic acid: A type of gel containing salicylic acid used in medicated band-aids.
- Cryotherapy: A freezing liquid (usually nitrogen) is sprayed onto the wart. It will destroy the cells in the affected area. After the cryotherapy, usually, a blister develops, which eventually turns into a scab and falls off after a week or so.
There are 90 samples or instances that were either recommended to go through cryotherapy or be discharged without cryotherapy. There are seven attributes in the dataset:
- sex: Patient gender, characterized by 1 (male) or 0 (female).
- age: Patient age.
- Time: Observation and treatment time in hours.
- Number_of_Warts: Number of warts.
- Type: Types of warts.
- Area: The amount of affected area.
- Result_of_Treatment: The recommended result of the treatment, characterized by either 1 (yes) or 0 (no). It is also the target column.
As you can understand, it is a classification problem because we will have to predict discrete labels. More specifically, it is a binary classification problem. Since this is a small dataset with only six features, we can start with a very basic classification algorithm called logistic regression, where the logistic function is applied to the regression to get the probabilities of it belonging in either class. We will learn more details about logistic regression and other classification algorithms in Chapter 3, Scala for Learning Classification. For this, we use the Spark ML-based implementation of logistic regression in Scala.
Configuring the programming environment
I am assuming that Java is already installed on your machine and JAVA_HOME is set too. Also, I'm assuming that your IDE has the Maven plugin installed. If so, then just create a Maven project and add the project properties as follows:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<jdk.version>1.8</jdk.version>
<spark.version>2.3.0</spark.version>
</properties>
In the preceding properties tag, I specified the Spark version (that is, 2.3.0), but you can adjust it. Then add the following dependencies in the pom.xml file:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-shuffle_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.11</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
Then, if everything goes smoothly, all the JAR files will be downloaded in the project home as Maven dependencies. Alright! Then we can start writing the code.
Getting started with Apache Spark
Since you're here to learn how to solve a real-life problem in Scala, exploring available Scala libraries would be worthwhile. Unfortunately, we don't have many options except for the Spark MLlib and ML, which can be used for the regression analysis very easily and comfortably. Importantly, it has every regression analysis algorithm implemented as high-level interfaces. I assume that Scala, Java, and your favorite IDE such as Eclipse or IntelliJ IDEA are already configured on your machine. We will introduce some concepts of Spark without providing much detail, but we will continue learning in upcoming chapters too.
First, I'll introduce SparkSession, which is a unified entry point of a Spark application introduced from Spark 2.0. Technically, SparkSession is the gateway to interact with some of Spark's functionality with a few constructs such as SparkContext, HiveContext, and SQLContext, which are all encapsulated in a SparkSession. Previously, you have seen how to create such a session, probably without knowing it. Well, a SparkSession can be created as a builder pattern as follows:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder // the builder itself
.master("local[4]") // number of cores (i.e. 4, use * for all cores)
.config("spark.sql.warehouse.dir", "/temp") // Spark SQL Hive Warehouse location
.appName("SparkSessionExample") // name of the Spark application
.getOrCreate() // get the existing session or create a new one
The preceding builder will try to get an existing SparkSession or create a new one. Then the newly created SparkSession will be assigned as the global default.
Creating a DataFrame is probably the most important task in every data analytics task. Spark provides a read() method that can be used to read data from numerous sources in various formats such as CSV, JSON, Avro, and JDBC. For example, the following code snippet shows how to read a CSV file and create a Spark DataFrame:
val dataDF = spark.read
.option("header", "true") // we read the header to know the column and structure
.option("inferSchema", "true") // we infer the schema preserved in the CSV
.format("com.databricks.spark.csv") // we're using the CSV reader from DataBricks
.load("data/inputData.csv") // Path of the CSV file
.cache // [Optional] cache if necessary
Once a DataFrame is created, we can see a few samples (that is, rows) by invoking the show() method, as well as print the schema using the printSchema() method. Invoking describe().show() will show the statistics about the DataFrame:
dataDF.show() // show first 10 rows
dataDF.printSchema() // shows the schema (including column name and type)
dataDF.describe().show() // shows descriptive statistics
In many cases, we have to use the spark.implicits._ package, which is one of the most useful imports. It is handy, with a lot of implicit methods for converting Scala objects to datasets and vice versa. Once we have created a DataFrame, we can create a view (temporary or global) for performing SQL using either the ceateOrReplaceTempView() method or the createGlobalTempView() method, respectively:
dataDF.createOrReplaceTempView("myTempDataFrame") // create or replace a local temporary view with dataDF
dataDF.createGlobalTempView("myGloDataFrame") // create a global temporary view with dataframe dataDF
Now a SQL query can be issued to see the data in tabular format:
spark.sql("SELECT * FROM myTempDataFrame")// will show all the records
To drop these views, spark.catalog.dropTempView("myTempDataFrame") or spark.catalog.dropGlobalTempView("myGloDataFrame"), respectively, can be invoked. By the way, once you're done simply invoking the spark.stop() method, it will destroy the SparkSession and all the resources allocated by the Spark application. Interested readers can read detailed API documentation at https://spark.apache.org/ to get more information.
Reading the training dataset
There is a Cryotherapy.xlsx Excel file, which contains data as well as data usage agreement texts. So, I just copied the data and saved it in a CSV file named Cryotherapy.csv. Let's start by creating SparkSession—the gateway to access Spark:
val spark = SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "/temp")
.appName("CryotherapyPrediction")
.getOrCreate()
import spark.implicits._
Then let's read the training set and see a glimpse of it:
var CryotherapyDF = spark.read.option("header", "true")
.option("inferSchema", "true")
.csv("data/Cryotherapy.csv")
Let's take a look to see if the preceding CSV reader managed to read the data properly, including header and types:
CryotherapyDF.printSchema()
As seen from the following screenshot, the schema of the Spark DataFrame has been correctly identified. Also, as expected, all the features of my ML algorithms are numeric (in other words, in integer or double format):
A snapshot of the dataset can be seen using the show() method. We can limit the number of rows; here, let's say 5:
CryotherapyDF.show(5)
The output of the preceding line of code shows the first five samples of the DataFrame:
Preprocessing and feature engineering
As per the dataset description on the UCI machine learning repository, there are no null values. Also, the Spark ML-based classifiers expect numeric values to model them. The good thing is that, as seen in the schema, all the required fields are numeric (that is, either integers or floating point values). Also, the Spark ML algorithms expect a label column, which in our case is Result_of_Treatment. Let's rename it to label using the Spark-provided withColumnRenamed() method:
//Spark ML algorithm expect a 'label' column, which is in our case 'Survived". Let's rename it to 'label'
CryotherapyDF = CryotherapyDF.withColumnRenamed("Result_of_Treatment", "label")
CryotherapyDF.printSchema()
All the Spark ML-based classifiers expect training data containing two objects called label (which we already have) and features. We have seen that we have six features. However, those features have to be assembled to create a feature vector. This can be done using the VectorAssembler() method. It is one kind of transformer from the Spark ML library. But first we need to select all the columns except the label column:
val selectedCols = Array("sex", "age", "Time", "Number_of_Warts", "Type", "Area")
Then we instantiate a VectorAssembler() transformer and transform as follows:
val vectorAssembler = new VectorAssembler()
.setInputCols(selectedCols)
.setOutputCol("features")
val numericDF = vectorAssembler.transform(CryotherapyDF)
.select("label", "features")
numericDF.show()
As expected, the last line of the preceding code segment shows the assembled DataFrame having label and features, which are needed to train an ML algorithm:
Preparing training data and training a classifier
Next, we separate the training set and test sets. Let's say that 80% of the training set will be used for the training and the other 20% will be used to evaluate the trained model:
val splits = numericDF.randomSplit(Array(0.8, 0.2))
val trainDF = splits(0)
val testDF = splits(1)
Instantiate a decision tree classifier by specifying impurity, max bins, and the max depth of the trees. Additionally, we set the label and feature columns:
val dt = new DecisionTreeClassifier()
.setImpurity("gini")
.setMaxBins(10)
.setMaxDepth(30)
.setLabelCol("label")
.setFeaturesCol("features")
Now that the data and the classifier are ready, we can perform the training:
val dtModel = dt.fit(trainDF)
Evaluating the model
Since it's a binary classification problem, we need the BinaryClassificationEvaluator() estimator to evaluate the model's performance on the test set:
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
Now that the training is completed and we have a trained decision tree model, we can evaluate the trained model on the test set:
val predictionDF = dtModel.transform(testDF)
Finally, we compute the classification accuracy:
val accuracy = evaluator.evaluate(predictionDF)
println("Accuracy = " + accuracy)
You should experience about 96% classification accuracy:
Accuracy = 0.9675436785432
Finally, we stop the SparkSession by invoking the stop() method:
spark.stop()
We have managed to achieve about 96% accuracy with minimum effort. However, there are other performance metrics such as precision, recall, and F1 measure. We will discuss them in upcoming chapters. Also, if you're a newbie to ML and haven't understood all the steps in this example, don't worry. We'll recap all of these steps in other chapters with various other examples.