Search icon CANCEL
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Conferences
Free Learning
Arrow right icon
Scala Machine Learning Projects
Scala Machine Learning Projects

Scala Machine Learning Projects: Build real-world machine learning and deep learning projects with Scala

Arrow left icon
Profile Icon Karim
Arrow right icon
€18.99 per month
Paperback Jan 2018 470 pages 1st Edition
eBook
€20.98 €29.99
Paperback
€36.99
Subscription
Free Trial
Renews at €18.99p/m
Arrow left icon
Profile Icon Karim
Arrow right icon
€18.99 per month
Paperback Jan 2018 470 pages 1st Edition
eBook
€20.98 €29.99
Paperback
€36.99
Subscription
Free Trial
Renews at €18.99p/m
eBook
€20.98 €29.99
Paperback
€36.99
Subscription
Free Trial
Renews at €18.99p/m

What do you get with a Packt Subscription?

Free for first 7 days. $19.99 p/m after that. Cancel any time!
Product feature icon Unlimited ad-free access to the largest independent learning library in tech. Access this title and thousands more!
Product feature icon 50+ new titles added per month, including many first-to-market concepts and exclusive early access to books as they are being written.
Product feature icon Innovative learning tools, including AI book assistants, code context explainers, and text-to-speech.
Product feature icon Thousands of reference materials covering every tech concept you need to stay up to date.
Subscribe now
View plans & pricing
Table of content icon View table of contents Preview book icon Preview Book

Scala Machine Learning Projects

Analyzing Insurance Severity Claims

Predicting the cost, and hence the severity, of claims in an insurance company is a real-life problem that needs to be solved in an accurate way. In this chapter, we will show you how to develop a predictive model for analyzing insurance severity claims using some of the most widely used regression algorithms.

We will start with simple linear regression (LR) and we will see how to improve the performance using some ensemble techniques, such as gradient boosted tree (GBT) regressors. Then we will look at how to boost the performance with Random Forest regressors. Finally, we will show you how to choose the best model and deploy it for a production-ready environment. Also, we will provide some background studies on machine learning workflow, hyperparameter tuning, and cross-validation.

For the implementation, we will use Spark ML API for faster computation and massive scalability. In a nutshell, we will learn the following topics throughout this end-to-end project:

  • Machine learning and learning workflow
  • Hyperparameter tuning and cross-validation of ML models
  • LR for analyzing insurance severity claims
  • Improving performance with gradient boosted regressors
  • Boosting the performance with random forest regressors
  • Model deployment

Machine learning and learning workflow

Machine learning (ML) is about using a set of statistical and mathematical algorithms to perform tasks such as concept learning, predictive modeling, clustering, and mining useful patterns can be performed. The ultimate goal is to improve the learning in such a way that it becomes automatic, so that no more human interactions are needed, or to reduce the level of human interaction as much as possible.

We now refer to a famous definition of ML by Tom M. Mitchell (Machine Learning, Tom Mitchell, McGraw Hill, 1997), where he explained what learning really means from a computer science perspective:

"A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P, if its performance at tasks in T, as measured by P, improves with experience E."

Based on the preceding definition, we can conclude that a computer program or machine can do the following:

  • Learn from data and histories
  • Be improved with experience
  • Interactively enhance a model that can be used to predict an outcome

A typical ML function can be formulated as a convex optimization problem for finding a minimizer of a convex function f that depends on a variable vector w (weights), which has d records. Formally, we can write this as the following optimization problem:

Here, the objective function is of the form:

Here, the vectors are the training data points for 1≤i≤n, and are their corresponding labels that we want to predict eventually. We call the method linear if L(w;x,y) can be expressed as a function of wTx and y.

The objective function f has two components:

  • A regularizer that controls the complexity of the model
  • The loss that measures the error of the model on the training data

The loss function L(w;) is typically a convex function in w. The fixed regularization parameter λ≥0 defines the trade-off between the two goals of minimizing the loss on the training error and minimizing model complexity to avoid overfitting. Throughout the chapters, we will learn in details on different learning types and algorithms.

On the other hand, deep neural networks (DNN) form the core of deep learning (DL) by providing algorithms to model complex and high-level abstractions in data and can better exploit large-scale datasets to build complex models

There are some widely used deep learning architectures based on artificial neural networks: DNNs, Capsule Networks, Restricted Boltzmann Machines, deep belief networks, factorization machines and recurrent neural networks.

These architectures have been widely used in computer vision, speech recognition, natural language processing, audio recognition, social network filtering, machine translation, bioinformatics and drug design. Throughout the chapters, we will see several real-life examples using these architectures to achieve state-of-the art predictive accuracy.

Typical machine learning workflow

A typical ML application involves several processing steps, from the input to the output, forming a scientific workflow as shown in Figure 1, ML workflow. The following steps are involved in a typical ML application:

  1. Load the data
  2. Parse the data into the input format for the algorithm
  3. Pre-process the data and handle the missing values
  4. Split the data into three sets, for training, testing, and validation (train set and validation set respectively) and one for testing the model (test dataset)
  5. Run the algorithm to build and train your ML model
  6. Make predictions with the training data and observe the results
  7. Test and evaluate the model with the test data or alternatively validate the model using some cross-validator technique using the third dataset called a validation dataset
  8. Tune the model for better performance and accuracy
  9. Scale up the model so that it can handle massive datasets in future
  10. Deploy the ML model in production:
Figure 1: ML workflow

The preceding workflow is represent a few steps to solve ML problems. Where, ML tasks can be broadly categorized into supervised, unsupervised, semi-supervised, reinforcement, and recommendation systems. The following Figure 2, Supervised learning in action, shows the schematic diagram of supervised learning. After the algorithm has found the required patterns, those patterns can be used to make predictions for unlabeled test data:

Figure 2: Supervised learning in action

Examples include classification and regression for solving supervised learning problems so that predictive models can be built for predictive analytics based on them. Throughout the upcoming chapters, we will provide several examples of supervised learning, such as LR, logistic regression, random forest, decision trees, Naive Bayes, multilayer perceptron, and so on.

A regression algorithm is meant to produce continuous output. The input is allowed to be either discrete or continuous:

Figure 3: A regression algorithm is meant to produce continuous output

A classification algorithm, on the other hand, is meant to produce discrete output from an input of a set of discrete or continuous values. This distinction is important to know because discrete-valued outputs are handled better by classification, which will be discussed in upcoming chapters:

Figure 4: A classification algorithm is meant to produce discrete output

In this chapter, we will mainly focus on the supervised regression algorithms. We will start with describing the problem statement and then we move on to the very simple LR algorithm. Often, performance of these ML models is optimized using hyperparameter tuning and cross-validation techniques. So knowing them, in brief, is mandatory so that we can easily use them in future chapters.

Hyperparameter tuning and cross-validation

Tuning an algorithm is simply a process that one goes through in order to enable the algorithm to perform optimally in terms of runtime and memory usage. In Bayesian statistics, a hyperparameter is a parameter of a prior distribution. In terms of ML, the term hyperparameter refers to those parameters that cannot be directly learned from the regular training process.

Hyperparameters are usually fixed before the actual training process begins. This is done by setting different values for those hyperparameters, training different models, and deciding which ones work best by testing them. Here are some typical examples of such parameters:

  • Number of leaves, bins, or depth of a tree
  • Number of iterations
  • Number of latent factors in a matrix factorization
  • Learning rate
  • Number of hidden layers in a deep neural network
  • The number of clusters in k-means clustering and so on

In short, hyperparameter tuning is a technique for choosing the right combination of hyperparameters based on the performance of presented data. It is one of the fundamental requirements for obtaining meaningful and accurate results from ML algorithms in practice. The following figure shows the model tuning process, things to consider, and workflow:

Figure 5: Model tuning process

Cross-validation (also known as rotation estimation) is a model validation technique for assessing the quality of the statistical analysis and results. The target is to make the model generalized toward an independent test set. It will help if you want to estimate how a predictive model will perform accurately in practice when you deploy it as an ML application. During the cross-validation process, a model is usually trained with a dataset of a known type.

Conversely, it is tested using a dataset of an unknown type. In this regard, cross-validation helps to describe a dataset to test the model in the training phase using the validation set. There are two types of cross-validation that can be typed as follows:

  • Exhaustive cross-validation: This includes leave-p-out cross-validation and leave-one-out cross-validation
  • Non-exhaustive cross-validation: This includes K-fold cross-validation and repeated random subsampling cross-validation

In most cases, the researcher/data scientist/data engineer uses 10-fold cross-validation instead of testing on a validation set (see more in Figure 6, 10-fold cross-validation technique). This is the most widely used cross-validation technique across all use cases and problem types, as explained by the following figure.

Basically, using this technique, your complete training data is split into a number of folds. This parameter can be specified. Then the whole pipeline is run once for every fold and one ML model is trained for each fold. Finally, the different ML models obtained are joined by a voting scheme for classifiers or by averaging for regression:

Figure 6: 10-fold cross-validation technique

Moreover, to reduce the variability, multiple iterations of cross-validation are performed using different partitions; finally, the validation results are averaged over the rounds.

Analyzing and predicting insurance severity claims

Predicting the cost, and hence the severity, of claims in an insurance company is a real-life problem that needs to be solved in a more accurate and automated way. We will do something similar in this example.

We will start with simple logistic regression and will learn how to improve the performance using some ensemble techniques, such as an random forest regressor. Then we will look at how to boost the performance with a gradient boosted regressor. Finally, we will show how to choose the best model and deploy it for a production-ready environment.

Motivation

When someone is devastated by a serious car accident, his focus is on his life, family, child, friends, and loved ones. However, once a file is submitted for the insurance claim, the overall paper-based process to calculate the severity claim is a tedious task to be completed.

This is why insurance companies are continually seeking fresh ideas to improve their claims service for their clients in an automated way. Therefore, predictive analytics is a viable solution to predicting the cost, and hence severity, of claims on the available and historical data.

Description of the dataset

A dataset from the Allstate Insurance company will be used, which consists of more than 300,000 examples with masked and anonymous data and consisting of more than 100 categorical and numerical attributes, thus being compliant with confidentiality constraints, more than enough for building and evaluating a variety of ML techniques.

The dataset is downloaded from the Kaggle website at https://www.kaggle.com/c/allstate-claims-severity/data. Each row in the dataset represents an insurance claim. Now, the task is to predict the value for the loss column. Variables prefaced with cat are categorical, while those prefaced with cont are continuous.

It is to be noted that the Allstate Corporation is the second largest insurance company in the United States, founded in 1931. We are trying to make the whole thing automated, to predict the cost, and hence the severity, of accident and damage claims.

Exploratory analysis of the dataset

Let's look at some data properties (use the EDA.scala file for this). At first, we need to read the training set to see the available properties. To begin with, let's place your training set in your project directory or somewhere else and point to it accordingly:

val train = "data/insurance_train.csv"

I hope you have Java, Scala and Spark installed and configured on your machine. If not, please do so. Anyway, I'm assuming they are. So let's create an active Spark session, which is the gateway for any Spark application:

val spark = SparkSessionCreate.createSession()
import spark.implicits._
Spark session alias on Scala REPL:
If you are inside Scala REPL, the Spark session alias spark is already defined, so just get going.

Here, I have a method called createSession() under the class SparkSessionCreate that goes as follows:

import org.apache.spark.sql.SparkSession 

object SparkSessionCreate { def createSession(): SparkSession = { val spark = SparkSession .builder .master("local[*]") // adjust accordingly .config("spark.sql.warehouse.dir", "E:/Exp/") //change accordingly .appName("MySparkSession") //change accordingly .getOrCreate() return spark }
}

Since this will be used frequently throughout this book, I decided to create a dedicated method. So, let's load, parse, and create a DataFrame using the read.csv method but in Databricks .csv format (as known as com.databricks.spark.csv) since our dataset comes with .csv format.

At this point, I have to interrupt you to inform something very useful. Since we will be using Spark MLlib and ML APIs in upcoming chapters too. Therefore, it would be worth fixing some issues in prior. If you're a Windows user then let me tell you a very weired issue that you will be experiencing while working with Spark.

Well, the thing is that Spark works on Windows, Mac OS, and Linux. While using Eclipse or IntelliJ IDEA to develop your Spark applications (or through Spark local job sumit) on Windows, you might face an I/O exception error and consequently your application might not compile successfully or may be interrupted.

The reason is that Spark expects that there is a runtime environment for Hadoop on Windows. Unfortunately, the binary distribution of Spark  (v2.2.0 for example) release does not contain some Windows native components (example, winutils.exe, hadoop.dll, and so on). However, these are required (not optional) to run Hadoop on Windows. Therefore, if you cannot ensure the runtime environment, an I/O exception saying the following:

24/01/2018 11:11:10 
ERROR util.Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

Now there are two ways to tackale this issue on Windows:

  1. From IDE such as Eclipse and IntelliJ IDEA: Download the winutls.exe from https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin/. Then download and copy it inside the bin folder in the Spark distribution—example, spark-2.2.0-bin-hadoop2.7/bin/ . Then select the project | Run Configurations... |  Environment | New | create a variable named HADOOP_HOME and put the path in the value field—example, c:/spark-2.2.0-bin-hadoop2.7/bin/ | OKApplyRun. Then you're done!
  2. With local Spark job submit: Add the winutils.exe file path to the hadoop home directory using System set properties—example, in the Spark code System.setProperty("hadoop.home.dir", "c:\\\spark-2.2.0-bin-hadoop2.7\\\bin\winutils.exe")

Alright, let's come to your original discussion. If you see the preceding code block then we set to read the header of the CSV file, which is directly applied to the column names of the DataFrame created, and the inferSchema property is set to true. If you don't specify the inferSchema configuration explicitly, the float values will be treated as strings. This might cause VectorAssembler to raise an exception such as java.lang.IllegalArgumentException: Data type StringType is not supported:

 val trainInput = spark.read 
    .option("header", "true") 
    .option("inferSchema", "true") 
    .format("com.databricks.spark.csv") 
    .load(train) 
    .cache 

Now let's print the schema of the DataFrame we just created. I have abridged the output and shown only a few columns:

Println(trainInput.printSchema()) 
root |-- id: integer (nullable = true) |-- cat1: string (nullable = true) |-- cat2: string (nullable = true) |-- cat3: string (nullable = true) ... |-- cat115: string (nullable = true) |-- cat116: string (nullable = true)
... |-- cont14: double (nullable = true) |-- loss: double (nullable = true)

You can see that there are 116 categorical columns for categorical features. Also, there are 14 numerical feature columns. Now let's see how many rows there are in the dataset using the count() method:

println(df.count())
>>>
188318

The preceding number is pretty high for training an ML model. Alright, now let's see a snapshot of the dataset using the show() method but with only some selected columns so that it makes more sense. Feel free to use df.show() to see all columns:

df.select("id", "cat1", "cat2", "cat3", "cont1", "cont2", "cont3", "loss").show() 
>>> 

Nevertheless, if you look at all the rows using df.show(), you will see some categorical columns containing too many categories. To be more specific, category columns cat109 to cat116 contain too many categories, as follows:

df.select("cat109", "cat110", "cat112", "cat113", "cat116").show() 
>>> 

In later stages, it would be worth dropping these columns to remove the skewness in the dataset. It is to be noted that in statistics, skewness is a measure of the asymmetry of the probability distribution of a real-valued random variable with respect to the mean.

Now that we have seen a snapshot of the dataset, it is worth seeing some other statistics such as average claim or loss, minimum, maximum loss, and many more, using Spark SQL. But before that, let's rename the last column from loss to label since the ML model will complain about it. Even after using the setLabelCol on the regression model, it still looks for a column called label. This results in a disgusting error saying org.apache.spark.sql.AnalysisException: cannot resolve 'label' given input columns:

val newDF = df.withColumnRenamed("loss", "label") 

Now, since we want to execute an SQL query, we need to create a temporary view so that the operation can be performed in-memory:

newDF.createOrReplaceTempView("insurance") 

Now let's average the damage claimed by the clients:

spark.sql("SELECT avg(insurance.label) as AVG_LOSS FROM insurance").show()
>>>
+------------------+
| AVG_LOSS |
+------------------+
|3037.3376856699924|
+------------------+

Similarly, let's see the lowest claim made so far:

spark.sql("SELECT min(insurance.label) as MIN_LOSS FROM insurance").show() 
>>>  
+--------+
|MIN_LOSS|
+--------+
| 0.67|
+--------+

And let's see the highest claim made so far:

spark.sql("SELECT max(insurance.label) as MAX_LOSS FROM insurance").show() 
>>> 
+---------+
| MAX_LOSS|
+---------+
|121012.25|
+---------+

Since Scala or Java does not come with a handy visualization library, I could not something else but now let's focus on the data preprocessing before we prepare our training set. 

Data preprocessing

Now that we have looked at some data properties, the next task is to do some preprocessing, such as cleaning, before getting the training set. For this part, use the Preprocessing.scala file. For this part, the following imports are required:

import org.apache.spark.ml.feature.{ StringIndexer, StringIndexerModel}
import org.apache.spark.ml.feature.VectorAssembler

Then we load both the training and the test set as shown in the following code:

var trainSample = 1.0 
var testSample = 1.0 
val train = "data/insurance_train.csv" 
val test = "data/insurance_test.csv" 
val spark = SparkSessionCreate.createSession()
import spark.implicits._ println("Reading data from " + train + " file")

val trainInput = spark.read .option("header", "true") .option("inferSchema", "true") .format("com.databricks.spark.csv") .load(train) .cache val testInput = spark.read .option("header", "true") .option("inferSchema", "true") .format("com.databricks.spark.csv") .load(test) .cache

The next task is to prepare the training and test set for our ML model to be learned. In the preceding DataFrame out of the training dataset, we renamed the loss to label. Then the content of train.csv was split into training and (cross) validation data, 75% and 25%, respectively.

The content of test.csv is used for evaluating the ML model. Both original DataFrames are also sampled, which is particularly useful for running fast executions on your local machine:

println("Preparing data for training model") 
var data = trainInput.withColumnRenamed("loss", "label").sample(false, trainSample) 

We also should do null checking. Here, I have used a naïve approach. The thing is that if the training DataFrame contains any null values, we completely drop those rows. This makes sense since a few rows out of 188,318 do no harm. However, feel free to adopt another approach such as null value imputation:

var DF = data.na.drop() 
if (data == DF) println("No null values in the DataFrame") else{ println("Null values exist in the DataFrame") data = DF }
val seed = 12345L val splits = data.randomSplit(Array(0.75, 0.25), seed) val (trainingData, validationData) = (splits(0), splits(1))

Then we cache both the sets for faster in-memory access:

trainingData.cache 
validationData.cache 

Additionally, we should perform the sampling of the test set that will be required in the evaluation step:

val testData = testInput.sample(false, testSample).cache 

Since the training set contains both the numerical and categorical values, we need to identify and treat them separately. First, let's identify only the categorical column:

def isCateg(c: String): Boolean = c.startsWith("cat") 
def categNewCol(c: String): String = if (isCateg(c)) s"idx_${c}" else c 

Then, the following method is used to remove categorical columns with too many categories, which we already discussed in the preceding section:

def removeTooManyCategs(c: String): Boolean = !(c matches "cat(109$|110$|112$|113$|116$)")

Now the following method is used to select only feature columns. So essentially, we should remove the ID (since the ID is just the identification number of the clients, it does not carry any non-trivial information) and the label column:

def onlyFeatureCols(c: String): Boolean = !(c matches "id|label") 

Well, so far we have treated some bad columns that are either trivial or not needed at all. Now the next task is to construct the definitive set of feature columns:

val featureCols = trainingData.columns 
    .filter(removeTooManyCategs) 
    .filter(onlyFeatureCols) 
    .map(categNewCol) 
StringIndexer encodes a given string column of labels to a column of label indices. If the input column is numeric in nature, we cast it to string using the StringIndexer and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.

Now we need to use the StringIndexer() for categorical columns:

val stringIndexerStages = trainingData.columns.filter(isCateg) 
      .map(c => new StringIndexer() 
      .setInputCol(c) 
      .setOutputCol(categNewCol(c)) 
      .fit(trainInput.select(c).union(testInput.select(c)))) 

Note that this is not an efficient approach. An alternative approach would be using a OneHotEncoder estimator.

OneHotEncoder maps a column of label indices to a column of binary vectors, with a single one-value at most. This encoding permits algorithms that expect continuous features, such as logistic regression, to utilize categorical features.

Now let's use the VectorAssembler() to transform a given list of columns into a single vector column:

val assembler = new VectorAssembler() 
    .setInputCols(featureCols) 
    .setOutputCol("features")
VectorAssembler is a transformer. It combines a given list of columns into a single vector column. It is useful for combining the raw features and features generated by different feature transformers into one feature vector, in order to train ML models such as logistic regression and decision trees.

That's all we need before we start training the regression models. First, we start training the LR model and evaluate the performance.

LR for predicting insurance severity claims

As you have already seen, the loss to be predicted contains continuous values, that is, it will be a regression task. So in using regression analysis here, the goal is to predict a continuous target variable, whereas another area called classification predicts a label from a finite set.

Logistic regression (LR) belongs to the family of regression algorithms. The goal of regression is to find relationships and dependencies between variables. It models the relationship between a continuous scalar dependent variable y (that is, label or target) and one or more (a D-dimensional vector) explanatory variable (also independent variables, input variables, features, observed data, observations, attributes, dimensions, and data points) denoted as x using a linear function:

Figure 9: A regression graph separates data points (in red dots) and the blue line is regression

LR models the relationship between a dependent variable y, which involves a linear combination of interdependent variables xi. The letters A and B represent constants that describe the y axis intercept and the slope of the line respectively:

y = A+Bx

Figure 9, Regression graph separates data points (in red dots) and the blue line is regression shows an example of simple LR with one independent variable—that is, a set of data points and a best fit line, which is the result of the regression analysis itself. It can be observed that the line does not actually pass through all of the points.

The distance between any data points (measured) and the line (predicted) is called the regression error. Smaller errors contribute to more accurate results in predicting unknown values. When the errors are reduced to their smallest levels possible, the line of best fit is created for the final regression error. Note that there are no single metrics in terms of regression errors; there are several as follows:

  • Mean Squared Error (MSE): It is a measure of how close a fitted line is to data points. The smaller the MSE, the closer the fit is to the data.
  • Root Mean Squared Error (RMSE): It is the square root of the MSE but probably the most easily interpreted statistic, since it has the same units as the quantity plotted on the vertical axis.
  • R-squared: R-squared is a statistical measure of how close the data is to the fitted regression line. R-squared is always between 0 and 100%. The higher the R-squared, the better the model fits your data.
  • Mean Absolute Error (MAE): MAE measures the average magnitude of the errors in a set of predictions without considering their direction. It's the average over the test sample of the absolute differences between prediction and actual observation where all individual differences have equal weight.
  • Explained variance: In statistics, explained variation measures the proportion to which a mathematical model accounts for the variation of a given dataset.

Developing insurance severity claims predictive model using LR

In this sub-section, we will develop a predictive analytics model for predicting accidental loss against the severity claim by clients. We start with importing required libraries:

import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel} 
import org.apache.spark.ml.{ Pipeline, PipelineModel } 
import org.apache.spark.ml.evaluation.RegressionEvaluator 
import org.apache.spark.ml.tuning.ParamGridBuilder 
import org.apache.spark.ml.tuning.CrossValidator 
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.mllib.evaluation.RegressionMetrics 

Then we create an active Spark session as the entry point to the application. In addition, importing implicits__ required for implicit conversions like converting RDDs to DataFrames.

val spark = SparkSessionCreate.createSession() 
import spark.implicits._ 

Then we define some hyperparameters, such as the number of folds for cross-validation, the number of maximum iterations, the value of the regression parameter, the value of tolerance, and elastic network parameters, as follows:

val numFolds = 10 
val MaxIter: Seq[Int] = Seq(1000) 
val RegParam: Seq[Double] = Seq(0.001) 
val Tol: Seq[Double] = Seq(1e-6) 
val ElasticNetParam: Seq[Double] = Seq(0.001) 

Well, now we create an LR estimator:

val model = new LinearRegression()
.setFeaturesCol("features")
.setLabelCol("label")

Now let's build a pipeline estimator by chaining the transformer and the LR estimator:

println("Building ML pipeline") 
val pipeline = new Pipeline()
.setStages((Preproessing.stringIndexerStages
:+ Preproessing.assembler) :+ model)
Spark ML pipelines have the following components:
  • DataFrame: Used as the central data store where all the original data and intermediate results are stored.
  • Transformer: A transformer transforms one DataFrame into another by adding additional feature columns. Transformers are stateless, meaning that they don't have any internal memory and behave exactly the same each time they are used.
  • Estimator: An estimator is some sort of ML model. In contrast to a transformer, an estimator contains an internal state representation and is highly dependent on the history of the data that it has already seen.
  • Pipeline: Chains the preceding components, DataFrame, Transformer, and Estimator together.
  • Parameter: ML algorithms have many knobs to tweak. These are called hyperparameters, and the values learned by a ML algorithm to fit data are called parameters.

Before we start performing the cross-validation, we need to have a paramgrid. So let's start creating the paramgrid by specifying the number of maximum iterations, the value of the regression parameter, the value of tolerance, and Elastic network parameters as follows:

val paramGrid = new ParamGridBuilder() 
      .addGrid(model.maxIter, MaxIter) 
      .addGrid(model.regParam, RegParam) 
      .addGrid(model.tol, Tol) 
      .addGrid(model.elasticNetParam, ElasticNetParam) 
      .build() 

Now, for a better and stable performance, let's prepare the K-fold cross-validation and grid search as a part of model tuning. As you can probably guess, I am going to perform 10-fold cross-validation. Feel free to adjust the number of folds based on your settings and dataset:

println("Preparing K-fold Cross Validation and Grid Search: Model tuning") 
val cv = new CrossValidator() 
      .setEstimator(pipeline) 
      .setEvaluator(new RegressionEvaluator) 
      .setEstimatorParamMaps(paramGrid) 
      .setNumFolds(numFolds) 

Fantastic - we have created the cross-validation estimator. Now it's time to train the LR model:

println("Training model with Linear Regression algorithm") 
val cvModel = cv.fit(Preproessing.trainingData) 

Now that we have the fitted model, that means it is now capable of making predictions. So let's start evaluating the model on the train and validation set and calculating RMSE, MSE, MAE, R-squared, and many more:

println("Evaluating model on train and validation set and calculating RMSE") 
val trainPredictionsAndLabels = cvModel.transform(Preproessing.trainingData)
.select("label", "prediction")
.map { case Row(label: Double, prediction: Double)
=> (label, prediction) }.rdd val validPredictionsAndLabels = cvModel.transform(Preproessing.validationData)
.select("label", "prediction")
.map { case Row(label: Double, prediction: Double)
=> (label, prediction) }.rdd val trainRegressionMetrics = new RegressionMetrics(trainPredictionsAndLabels) val validRegressionMetrics = new RegressionMetrics(validPredictionsAndLabels)

Great! We have managed to compute the raw prediction on the train and the test set. Let's hunt for the best model:

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel] 

Once we have the best fitted and cross-validated model, we can expect good prediction accuracy. Now let's observe the results on the train and the validation set:

val results = "n=====================================================================n" + s"Param trainSample: ${Preproessing.trainSample}n" + 
      s"Param testSample: ${Preproessing.testSample}n" + 
      s"TrainingData count: ${Preproessing.trainingData.count}n" + 
      s"ValidationData count: ${Preproessing.validationData.count}n" + 
      s"TestData count: ${Preproessing.testData.count}n" +      "=====================================================================n" +   s"Param maxIter = ${MaxIter.mkString(",")}n" + 
      s"Param numFolds = ${numFolds}n" +      "=====================================================================n" +   s"Training data MSE = ${trainRegressionMetrics.meanSquaredError}n" + 
      s"Training data RMSE = ${trainRegressionMetrics.rootMeanSquaredError}n" + 
      s"Training data R-squared = ${trainRegressionMetrics.r2}n" + 
      s"Training data MAE = ${trainRegressionMetrics.meanAbsoluteError}n" + 
      s"Training data Explained variance = ${trainRegressionMetrics.explainedVariance}n" +      "=====================================================================n" +   s"Validation data MSE = ${validRegressionMetrics.meanSquaredError}n" + 
      s"Validation data RMSE = ${validRegressionMetrics.rootMeanSquaredError}n" + 
      s"Validation data R-squared = ${validRegressionMetrics.r2}n" + 
      s"Validation data MAE = ${validRegressionMetrics.meanAbsoluteError}n" + 
      s"Validation data Explained variance = ${validRegressionMetrics.explainedVariance}n" + 
      s"CV params explained: ${cvModel.explainParams}n" + 
      s"LR params explained: ${bestModel.stages.last.asInstanceOf[LinearRegressionModel].explainParams}n" +      "=====================================================================n" 

Now, we print the preceding results as follows:

println(results)
>>> Building Machine Learning pipeline Reading data from data/insurance_train.csv file Null values exist in the DataFrame Training model with Linear Regression algorithm ===================================================================== Param trainSample: 1.0 Param testSample: 1.0 TrainingData count: 141194 ValidationData count: 47124 TestData count: 125546 ===================================================================== Param maxIter = 1000 Param numFolds = 10 ===================================================================== Training data MSE = 4460667.3666198505 Training data RMSE = 2112.0292059107164 Training data R-squared = -0.1514435541595276 Training data MAE = 1356.9375609756164 Training data Explained variance = 8336528.638733305 ===================================================================== Validation data MSE = 4839128.978963534 Validation data RMSE = 2199.802031766389 Validation data R-squared = -0.24922962724089603 Validation data MAE = 1356.419484419514 Validation data Explained variance = 8724661.329105612 CV params explained: estimator: estimator for selection (current: pipeline_d5024480c670) estimatorParamMaps: param maps for the estimator (current: [Lorg.apache.spark.ml.param.ParamMap;@2f0c9855) evaluator: evaluator used to select hyper-parameters that maximize the validated metric (current: regEval_00c707fcaa06) numFolds: number of folds for cross validation (>= 2) (default: 3, current: 10) seed: random seed (default: -1191137437) LR params explained: aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2) elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0, current: 0.001) featuresCol: features column name (default: features, current: features) fitIntercept: whether to fit an intercept term (default: true) labelCol: label column name (default: label, current: label) maxIter: maximum number of iterations (>= 0) (default: 100, current: 1000) predictionCol: prediction column name (default: prediction) regParam: regularization parameter (>= 0) (default: 0.0, current: 0.001) solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto) standardization: whether to standardize the training features before fitting the model (default: true) tol: the convergence tolerance for iterative algorithms (>= 0) (default: 1.0E-6, current: 1.0E-6) weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (undefined) =====================================================================

So our predictive model shows an MAE of about 1356.419484419514 for both the training and test set. However, the MAE is much lower on the Kaggle public and private leaderboard (go to: https://www.kaggle.com/c/allstate-claims-severity/leaderboard) with an MAE of 1096.92532 and 1109.70772 respectively.

Wait! We are not done yet. We still need to make a prediction on the test set:

println("Run prediction on the test set") 
cvModel.transform(Preproessing.testData) 
      .select("id", "prediction") 
      .withColumnRenamed("prediction", "loss") 
      .coalesce(1) // to get all the predictions in a single csv file 
      .write.format("com.databricks.spark.csv")
.option("header", "true") .save("output/result_LR.csv")

The preceding code should generate a CSV file named result_LR.csv. If we open the file, we should observe the loss against each ID, that is, claim. We will see the contents for both LR, RF, and GBT at the end of this chapter. Nevertheless, it is always a good idea to stop the Spark session by invoking the spark.stop() method.

An ensemble method is a learning algorithm that creates a model that is composed of a set of other base models. Spark ML supports two major ensemble algorithms called GBT and random forest based on decision trees. We will now see if we can improve the prediction accuracy by reducing the MAE error significantly using GBT.

GBT regressor for predicting insurance severity claims

In order to minimize a loss function, Gradient Boosting Trees (GBTs) iteratively train many decision trees. On each iteration, the algorithm uses the current ensemble to predict the label of each training instance.

Then the raw predictions are compared with the true labels. Thus, in the next iteration, the decision tree will help correct previous mistakes if the dataset is re-labeled to put more emphasis on training instances with poor predictions.

Since we are talking about regression, it would be more meaningful to discuss the regression strength of GBTs and its losses computation. Suppose we have the following settings:

  • N data instances
  • yi = label of instance i
  • xi = features of instance i

Then the F(xi) function is the model's predicted label; for instance, it tries to minimize the error, that is, loss:

Now, similar to decision trees, GBTs also:

  • Handle categorical features (and of course numerical features too)
  • Extend to the multiclass classification setting
  • Perform both the binary classification and regression (multiclass classification is not yet supported)
  • Do not require feature scaling
  • Capture non-linearity and feature interactions, which are greatly missing in LR, such as linear models
Validation while training: Gradient boosting can overfit, especially when you have trained your model with more trees. In order to prevent this issue, it is useful to validate while carrying out the training.

Since we have already prepared our dataset, we can directly jump into implementing a GBT-based predictive model for predicting insurance severity claims. Let's start with importing the necessary packages and libraries:

import org.apache.spark.ml.regression.{GBTRegressor, GBTRegressionModel} 
import org.apache.spark.ml.{Pipeline, PipelineModel} 
import org.apache.spark.ml.evaluation.RegressionEvaluator 
import org.apache.spark.ml.tuning.ParamGridBuilder 
import org.apache.spark.ml.tuning.CrossValidator 
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.mllib.evaluation.RegressionMetrics 

Now let's define and initialize the hyperparameters needed to train the GBTs, such as the number of trees, number of max bins, number of folds to be used during cross-validation, number of maximum iterations to iterate the training, and finally max tree depth:

val NumTrees = Seq(5, 10, 15) 
val MaxBins = Seq(5, 7, 9) 
val numFolds = 10 
val MaxIter: Seq[Int] = Seq(10) 
val MaxDepth: Seq[Int] = Seq(10) 

Then, again we instantiate a Spark session and implicits as follows:

val spark = SparkSessionCreate.createSession() 
import spark.implicits._ 

Now that we care an estimator algorithm, that is, GBT:

val model = new GBTRegressor()
.setFeaturesCol("features")
.setLabelCol("label")

Now, we build the pipeline by chaining the transformations and predictor together as follows:

val pipeline = new Pipeline().setStages((Preproessing.stringIndexerStages :+ Preproessing.assembler) :+ model) 

Before we start performing the cross-validation, we need to have a paramgrid. So let's start creating the paramgrid by specifying the number of maximum iteration, max tree depth, and max bins as follows:

val paramGrid = new ParamGridBuilder() 
      .addGrid(model.maxIter, MaxIter) 
      .addGrid(model.maxDepth, MaxDepth) 
      .addGrid(model.maxBins, MaxBins) 
      .build() 

Now, for a better and stable performance, let's prepare the K-fold cross-validation and grid search as a part of model tuning. As you can guess, I am going to perform 10-fold cross-validation. Feel free to adjust the number of folds based on you settings and dataset:

println("Preparing K-fold Cross Validation and Grid Search") 
val cv = new CrossValidator() 
      .setEstimator(pipeline) 
      .setEvaluator(new RegressionEvaluator) 
      .setEstimatorParamMaps(paramGrid) 
      .setNumFolds(numFolds) 

Fantastic, we have created the cross-validation estimator. Now it's time to train the GBT model:

println("Training model with GradientBoostedTrees algorithm ") 
val cvModel = cv.fit(Preproessing.trainingData) 

Now that we have the fitted model, that means it is now capable of making predictions. So let's start evaluating the model on the train and validation set, and calculating RMSE, MSE, MAE, R-squared, and so on:

println("Evaluating model on train and test data and calculating RMSE") 
val trainPredictionsAndLabels = cvModel.transform(Preproessing.trainingData).select("label", "prediction").map { case Row(label: Double, prediction: Double) => (label, prediction) }.rdd 

val validPredictionsAndLabels = cvModel.transform(Preproessing.validationData).select("label", "prediction").map { case Row(label: Double, prediction: Double) => (label, prediction) }.rdd val trainRegressionMetrics = new RegressionMetrics(trainPredictionsAndLabels) val validRegressionMetrics = new RegressionMetrics(validPredictionsAndLabels)

Great! We have managed to compute the raw prediction on the train and the test set. Let's hunt for the best model:

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel] 

As already stated, by using GBT it is possible to measure feature importance so that at a later stage we can decide which features are to be used and which ones are to be dropped from the DataFrame. Let's find the feature importance of the best model we just created previously, for all features in ascending order as follows:

val featureImportances = bestModel.stages.last.asInstanceOf[GBTRegressionModel].featureImportances.toArray 
val FI_to_List_sorted = featureImportances.toList.sorted.toArray  

Once we have the best fitted and cross-validated model, we can expect good prediction accuracy. Now let's observe the results on the train and the validation set:

val output = "n=====================================================================n" + s"Param trainSample: ${Preproessing.trainSample}n" + 
      s"Param testSample: ${Preproessing.testSample}n" + 
      s"TrainingData count: ${Preproessing.trainingData.count}n" + 
      s"ValidationData count: ${Preproessing.validationData.count}n" + 
      s"TestData count: ${Preproessing.testData.count}n" +      "=====================================================================n" +   s"Param maxIter = ${MaxIter.mkString(",")}n" + 
      s"Param maxDepth = ${MaxDepth.mkString(",")}n" + 
      s"Param numFolds = ${numFolds}n" +      "=====================================================================n" +   s"Training data MSE = ${trainRegressionMetrics.meanSquaredError}n" + 
      s"Training data RMSE = ${trainRegressionMetrics.rootMeanSquaredError}n" + 
      s"Training data R-squared = ${trainRegressionMetrics.r2}n" + 
      s"Training data MAE = ${trainRegressionMetrics.meanAbsoluteError}n" + 
      s"Training data Explained variance = ${trainRegressionMetrics.explainedVariance}n" +      "=====================================================================n" +    s"Validation data MSE = ${validRegressionMetrics.meanSquaredError}n" + 
      s"Validation data RMSE = ${validRegressionMetrics.rootMeanSquaredError}n" + 
      s"Validation data R-squared = ${validRegressionMetrics.r2}n" + 
      s"Validation data MAE = ${validRegressionMetrics.meanAbsoluteError}n" + 
      s"Validation data Explained variance = ${validRegressionMetrics.explainedVariance}n" +      "=====================================================================n" +   s"CV params explained: ${cvModel.explainParams}n" + 
      s"GBT params explained: ${bestModel.stages.last.asInstanceOf[GBTRegressionModel].explainParams}n" + s"GBT features importances:n ${Preproessing.featureCols.zip(FI_to_List_sorted).map(t => s"t${t._1} = ${t._2}").mkString("n")}n" +      "=====================================================================n" 

Now, we print the preceding results as follows:

println(results)
>>> ===================================================================== Param trainSample: 1.0 Param testSample: 1.0 TrainingData count: 141194 ValidationData count: 47124 TestData count: 125546 ===================================================================== Param maxIter = 10 Param maxDepth = 10 Param numFolds = 10 ===================================================================== Training data MSE = 2711134.460296872 Training data RMSE = 1646.5522950385973 Training data R-squared = 0.4979619968485668 Training data MAE = 1126.582534126603 Training data Explained variance = 8336528.638733303 ===================================================================== Validation data MSE = 4796065.983773314 Validation data RMSE = 2189.9922337244293 Validation data R-squared = 0.13708582379658474 Validation data MAE = 1289.9808960385383 Validation data Explained variance = 8724866.468978886 ===================================================================== CV params explained: estimator: estimator for selection (current: pipeline_9889176c6eda) estimatorParamMaps: param maps for the estimator (current: [Lorg.apache.spark.ml.param.ParamMap;@87dc030) evaluator: evaluator used to select hyper-parameters that maximize the validated metric (current: regEval_ceb3437b3ac7) numFolds: number of folds for cross validation (>= 2) (default: 3, current: 10) seed: random seed (default: -1191137437) GBT params explained: cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. (default: false) checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations (default: 10) featuresCol: features column name (default: features, current: features) impurity: Criterion used for information gain calculation (case-insensitive). Supported options: variance (default: variance) labelCol: label column name (default: label, current: label) lossType: Loss function which GBT tries to minimize (case-insensitive). Supported options: squared, absolute (default: squared) maxBins: Max number of bins for discretizing continuous features. Must be >=2 and >= number of categories for any categorical feature. (default: 32) maxDepth: Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. (default: 5, current: 10) maxIter: maximum number of iterations (>= 0) (default: 20, current: 10) maxMemoryInMB: Maximum memory in MB allocated to histogram aggregation. (default: 256) minInfoGain: Minimum information gain for a split to be considered at a tree node. (default: 0.0) minInstancesPerNode: Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1. (default: 1) predictionCol: prediction column name (default: prediction) seed: random seed (default: -131597770) stepSize: Step size (a.k.a. learning rate) in interval (0, 1] for shrinking the contribution of each estimator. (default: 0.1) subsamplingRate: Fraction of the training data used for learning each decision tree, in range (0, 1]. (default: 1.0) GBT features importance: idx_cat1 = 0.0 idx_cat2 = 0.0 idx_cat3 = 0.0 idx_cat4 = 3.167169394850417E-5 idx_cat5 = 4.745749854188828E-5 ... idx_cat111 = 0.018960701085054904 idx_cat114 = 0.020609596772820878 idx_cat115 = 0.02281267960792931 cont1 = 0.023943087007850663 cont2 = 0.028078353534251005 ... cont13 = 0.06921704925937068 cont14 = 0.07609111789104464 =====================================================================

So our predictive model shows an MAE of about 1126.582534126603 and 1289.9808960385383 for the training and test sets respectively. The last result is important for understanding the feature importance (the preceding list is abridged to save space but you should receive the full list). Especially, we can see that the first three features are not important at all so we can safely drop them from the DataFrame. We will provide more insight in the next section.

Now finally, let us run the prediction over the test set and generate the predicted loss for each claim from the clients:

println("Run prediction over test dataset") 
cvModel.transform(Preproessing.testData) 
      .select("id", "prediction") 
      .withColumnRenamed("prediction", "loss") 
      .coalesce(1) 
      .write.format("com.databricks.spark.csv") 
      .option("header", "true") 
      .save("output/result_GBT.csv") 

The preceding code should generate a CSV file named result_GBT.csv. If we open the file, we should observe the loss against each ID, that is, claim. We will see the contents for both LR, RF, and GBT at the end of this chapter. Nevertheless, it is always a good idea to stop the Spark session by invoking the spark.stop() method.

Boosting the performance using random forest regressor

In the previous sections, we did not experience the expected MAE value although we got predictions of the severity loss in each instance. In this section, we will develop a more robust predictive analytics model for the same purpose but use an random forest regressor. However, before diving into its formal implementation, a short overview of the random forest algorithm is needed.

Random Forest for classification and regression

Random Forest is an ensemble learning technique used for solving supervised learning tasks, such as classification and regression. An advantageous feature of Random Forest is that it can overcome the overfitting problem across its training dataset. A forest in Random Forest usually consists of hundreds of thousands of trees. These trees are actually trained on different parts of the same training set.

More technically, an individual tree that grows very deep tends to learn from highly unpredictable patterns. This creates overfitting problems on the training sets. Moreover, low biases make the classifier a low performer even if your dataset quality is good in terms of the features presented. On the other hand, an Random Forest helps to average multiple decision trees together with the goal of reducing the variance to ensure consistency by computing proximities between pairs of cases.

GBT or Random Forest? Although both GBT and Random Forest are ensembles of trees, the training processes are different. There are several practical trade-offs that exist, which often poses the dilemma of which one to choose. However, Random Forest would be the winner in most cases. Here are some justifications:
  • GBTs train one tree at a time, but Random Forest can train multiple trees in parallel. So the training time is lower for RF. However, in some special cases, training and using a smaller number of trees with GBTs is easier and quicker.
  • RFs are less prone to overfitting in most cases, so it reduces the likelihood of overfitting. In other words, Random Forest reduces variance with more trees, but GBTs reduce bias with more trees.
  • Finally, Random Forest can be easier to tune since performance improves monotonically with the number of trees, but GBT performs badly with an increased number of trees.

However, this slightly increases bias and makes it harder to interpret the results. But eventually, the performance of the final model increases dramatically. While using the Random Forest as a classifier, there are some parameter settings:

  • If the number of trees is 1, then no bootstrapping is used at all; however, if the number of trees is > 1, then bootstrapping is needed. The supported values are auto, all, sqrt, log2, and onethird.
  • The supported numerical values are (0.0-1.0) and [1-n]. However, if featureSubsetStrategy is chosen as auto, the algorithm chooses the best feature subset strategy automatically.
  • If the numTrees == 1, the featureSubsetStrategy is set to be all. However, if the numTrees > 1 (that is, forest), the featureSubsetStrategy is set to be sqrt for classification.
  • Moreover, if a real value n is set in the range of (0, 1.0), n*number_of_features will be used. However, if an integer value n is in the range (1, the number of features) is set, only n features are used alternatively.
  • The parameter categoricalFeaturesInfo is a map used for storing arbitrary or of categorical features. An entry (n -> k) indicates that feature n is categorical with I categories indexed from 0: (0, 1,...,k-1).
  • The impurity criterion is used for information gain calculation. The supported values are gini and variance for classification and regression respectively.
  • The maxDepth is the maximum depth of the tree (for example, depth 0 means one leaf node, depth 1 means one internal node plus two leaf nodes).
  • The maxBins signifies the maximum number of bins used for splitting the features, where the suggested value is 100 to get better results.
  • Finally, the random seed is used for bootstrapping and choosing feature subsets to avoid the random nature of the results.

As already mentioned, since Random Forest is fast and scalable enough for a large-scale dataset, Spark is a suitable technology to implement the RF, and to implement this massive scalability. However, if the proximities are calculated, storage requirements also grow exponentially.

Well, that's enough about RF. Now it's time to get our hands dirty, so let's get started. We begin with importing required libraries:

import org.apache.spark.ml.regression.{RandomForestRegressor, RandomForestRegressionModel} 
import org.apache.spark.ml.{ Pipeline, PipelineModel } 
import org.apache.spark.ml.evaluation.RegressionEvaluator 
import org.apache.spark.ml.tuning.ParamGridBuilder 
import org.apache.spark.ml.tuning.CrossValidator 
import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.mllib.evaluation.RegressionMetrics 

Then we create an active Spark session and import implicits:

val spark = SparkSessionCreate.createSession() 
import spark.implicits._ 

Then we define some hyperparameters, such as the number of folds for cross-validation, number of maximum iterations, the value of regression parameters, value of tolerance, and elastic network parameters, as follows:

val NumTrees = Seq(5,10,15)  
val MaxBins = Seq(23,27,30)  
val numFolds = 10  
val MaxIter: Seq[Int] = Seq(20) 
val MaxDepth: Seq[Int] = Seq(20) 

Note that for an Random Forest based on a decision tree, we require maxBins to be at least as large as the number of values in each categorical feature. In our dataset, we have 110 categorical features with 23 distinct values. Considering this, we have to set MaxBins to at least 23. Nevertheless, feel free to play with the previous parameters too. Alright, now it's time to create an LR estimator:

val model = new RandomForestRegressor().setFeaturesCol("features").setLabelCol("label")

Now let's build a pipeline estimator by chaining the transformer and the LR estimator:

println("Building ML pipeline") 
val pipeline = new Pipeline().setStages((Preproessing.stringIndexerStages :+ Preproessing.assembler) :+ model) 

Before we start performing the cross-validation, we need to have a paramgrid. So let's start creating the paramgrid by specifying the number of trees, a number for maximum tree depth, and the number of maximum bins parameters, as follows:

val paramGrid = new ParamGridBuilder() 
      .addGrid(model.numTrees, NumTrees) 
      .addGrid(model.maxDepth, MaxDepth) 
      .addGrid(model.maxBins, MaxBins) 
      .build() 

Now, for better and stable performance, let's prepare the K-fold cross-validation and grid search as a part of model tuning. As you can probably guess, I am going to perform 10-fold cross-validation. Feel free to adjust the number of folds based on your settings and dataset:

println("Preparing K-fold Cross Validation and Grid Search: Model tuning") 
val cv = new CrossValidator() 
      .setEstimator(pipeline) 
      .setEvaluator(new RegressionEvaluator) 
      .setEstimatorParamMaps(paramGrid) 
      .setNumFolds(numFolds) 

Fantastic, we have created the cross-validation estimator. Now it's time to train the LR model:

println("Training model with Random Forest algorithm")  
val cvModel = cv.fit(Preproessing.trainingData) 

Now that we have the fitted model, that means it is now capable of making predictions. So let's start evaluating the model on the train and validation set, and calculating RMSE, MSE, MAE, R-squared, and many more:

println("Evaluating model on train and validation set and calculating RMSE") 
val trainPredictionsAndLabels = cvModel.transform(Preproessing.trainingData).select("label", "prediction").map { case Row(label: Double, prediction: Double) => (label, prediction) }.rdd 
 
val validPredictionsAndLabels = cvModel.transform(Preproessing.validationData).select("label", "prediction").map { case Row(label: Double, prediction: Double) => (label, prediction) }.rdd 
 
val trainRegressionMetrics = new RegressionMetrics(trainPredictionsAndLabels) 
val validRegressionMetrics = new RegressionMetrics(validPredictionsAndLabels) 

Great! We have managed to compute the raw prediction on the train and the test set. Let's hunt for the best model:

val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]

As already stated, by using RF, it is possible to measure the feature importance so that at a later stage, we can decide which features should be used and which ones are to be dropped from the DataFrame. Let's find the feature importance from the best model we just created for all features in ascending order, as follows:

val featureImportances = bestModel.stages.last.asInstanceOf[RandomForestRegressionModel].featureImportances.toArray 
val FI_to_List_sorted = featureImportances.toList.sorted.toArray  

Once we have the best fitted and cross-validated model, we can expect a good prediction accuracy. Now let's observe the results on the train and the validation set:

val output = "n=====================================================================n" + s"Param trainSample: ${Preproessing.trainSample}n" + 
      s"Param testSample: ${Preproessing.testSample}n" + 
      s"TrainingData count: ${Preproessing.trainingData.count}n" + 
      s"ValidationData count: ${Preproessing.validationData.count}n" + 
      s"TestData count: ${Preproessing.testData.count}n" +      "=====================================================================n" +   s"Param maxIter = ${MaxIter.mkString(",")}n" + 
      s"Param maxDepth = ${MaxDepth.mkString(",")}n" + 
      s"Param numFolds = ${numFolds}n" +      "=====================================================================n" +   s"Training data MSE = ${trainRegressionMetrics.meanSquaredError}n" + 
      s"Training data RMSE = ${trainRegressionMetrics.rootMeanSquaredError}n" + 
      s"Training data R-squared = ${trainRegressionMetrics.r2}n" + 
      s"Training data MAE = ${trainRegressionMetrics.meanAbsoluteError}n" + 
      s"Training data Explained variance = ${trainRegressionMetrics.explainedVariance}n" +      "=====================================================================n" +   s"Validation data MSE = ${validRegressionMetrics.meanSquaredError}n" + 
      s"Validation data RMSE = ${validRegressionMetrics.rootMeanSquaredError}n" + 
      s"Validation data R-squared = ${validRegressionMetrics.r2}n" + 
      s"Validation data MAE = ${validRegressionMetrics.meanAbsoluteError}n" + 
      s"Validation data Explained variance =
${validRegressionMetrics.explainedVariance}n" + "=====================================================================n" + s"CV params explained: ${cvModel.explainParams}n" + s"RF params explained: ${bestModel.stages.last.asInstanceOf[RandomForestRegressionModel].explainParams}n" + s"RF features importances:n ${Preproessing.featureCols.zip(FI_to_List_sorted).map(t => s"t${t._1} = ${t._2}").mkString("n")}n" + "=====================================================================n"

Now, we print the preceding results as follows:

println(results)
>>>
Param trainSample: 1.0
Param testSample: 1.0
TrainingData count: 141194
ValidationData count: 47124
TestData count: 125546
Param maxIter = 20
Param maxDepth = 20
Param numFolds = 10
Training data MSE = 1340574.3409399686
Training data RMSE = 1157.8317412042081
Training data R-squared = 0.7642745310548124
Training data MAE = 809.5917285994619
Training data Explained variance = 8337897.224852404
Validation data MSE = 4312608.024875177
Validation data RMSE = 2076.6819749001475
Validation data R-squared = 0.1369507149716651"
Validation data MAE = 1273.0714382935894
Validation data Explained variance = 8737233.110450774

So our predictive model shows an MAE of about 809.5917285994619 and 1273.0714382935894 for the training and test set respectively. The last result is important for understanding the feature importance (the preceding list is abridged to save space but you should receive the full list).

I have drawn both the categorical and continuous features, and their respective importance in Python, so I will not show the code here but only the graph. Let's see the categorical features showing feature importance as well as the corresponding feature number:

Figure 11: Random Forest categorical feature importance

From the preceding graph, it is clear that categorical features cat20, cat64, cat47, and cat69 are less important. Therefore, it would make sense to drop these features and retrain the Random Forest model to observe better performance.

Now let's see how the continuous features are correlated and contribute to the loss column. From the following figure, we can see that all continuous features are positively correlated with the loss column. This also signifies that these continuous features are not that important compared to the categorical ones we have seen in the preceding figure:

Figure 12: Correlations between the continuous features and the label

What we can learn from these two analyses is that we can naively drop some unimportant columns and train the Random Forest model to observe if there is any reduction in the MAE value for both the training and validation set. Finally, let's make a prediction on the test set:

println("Run prediction on the test set") 
cvModel.transform(Preproessing.testData) 
      .select("id", "prediction") 
      .withColumnRenamed("prediction", "loss") 
      .coalesce(1) // to get all the predictions in a single csv file                 
.write.format("com.databricks.spark.csv") .option("header", "true") .save("output/result_RF.csv")

Also, similar to LR, you can stop the Spark session by invoking the stop() method. Now the generated result_RF.csv file should contain the loss against each ID, that is, claim.

Comparative analysis and model deployment

You have already seen that the LR model is much easier to train for a small training dataset. However, we haven't experienced better accuracy compared to GBT and Random Forest models. However, the simplicity of the LR model is a very good starting point. On the other hand, we already argued that Random Forest would be the winner over GBT for several reasons, of course. Let's see the results in a table:

Now let's see how the predictions went for each model for 20 accidents or damage claims:

Figure 13: Loss prediction by i) LR, ii) GBT, and iii) Random Forest models

Therefore, based on table 2, it is clear that we should go with the Random Forest regressor to not only predict the insurance claim loss but also its production. Now we will see a quick overview of how to take our best model, that is, an Random Forest regressor into production. The idea is, as a data scientist, you may have produced an ML model and handed it over to an engineering team in your company for deployment in a production-ready environment.

Here, I provide a naïve approach, though IT companies must have their own way to deploy the models. Nevertheless, there will be a dedicated section at the end of this topic. This scenario can easily become a reality by using model persistence—the ability to save and load models that come with Spark. Using Spark, you can either:

  • Save and load a single model
  • Save and load a full pipeline

A single model is pretty simple, but less effective and mainly works on Spark MLlib-based model persistence. Since we are more interested in saving the best model, that is, the Random Forest regressor model, at first we will fit an Random Forest regressor using Scala, save it, and then load the same model back using Scala:

// Estimator algorithm 
val model = new RandomForestRegressor() 
                    .setFeaturesCol("features") 
                    .setLabelCol("label") 
                    .setImpurity("gini") 
                    .setMaxBins(20) 
                    .setMaxDepth(20) 
                    .setNumTrees(50) 
fittedModel = rf.fit(trainingData) 

We can now simply call the write.overwrite().save() method to save this model to local storage, HDFS, or S3, and the load method to load it right back for future use:

fittedModel.write.overwrite().save("model/RF_model")  
val sameModel = CrossValidatorModel.load("model/RF_model") 

Now the thing that we need to know is how to use the restored model for making predictions. Here's the answer:

sameModel.transform(Preproessing.testData) 
    .select("id", "prediction") 
    .withColumnRenamed("prediction", "loss") 
    .coalesce(1) 
    .write.format("com.databricks.spark.csv") 
    .option("header", "true") 
    .save("output/result_RF_reuse.csv") 
Figure 14: Spark model deployment for production

So far, we have only looked at saving and loading a single ML model but not a tuned or stable one. It might even provide you with many wrong predictions. Therefore, now the second approach might be more effective.

The reality is that, in practice, ML workflows consist of many stages, from feature extraction and transformation to model fitting and tuning. Spark ML provides pipelines to help users construct these workflows. Similarly, a pipeline with the cross-validated model can be saved and restored back the same way as we did in the first approach.

We fit the cross-validated model with the training set:

val cvModel = cv.fit(Preproessing.trainingData)   

Then we save the workflow/pipeline:

cvModel.write.overwrite().save("model/RF_model") 

Note that the preceding line of code will save the model in your preferred location with the following directory structure:

Figure 15: Saved model directory structure
//Then we restore the same model back:
val sameCV = CrossValidatorModel.load("model/RF_model") 
Now when you try to restore the same model, Spark will automatically pick the best one. Finally, we reuse this model for making a prediction as follows:
sameCV.transform(Preproessing.testData) 
      .select("id", "prediction") 
      .withColumnRenamed("prediction", "loss") 
      .coalesce(1) 
      .write.format("com.databricks.spark.csv") 
      .option("header", "true") 
      .save("output/result_RF_reuse.csv")  

Spark-based model deployment for large-scale dataset

In a production ready environment, we often need to deploy a pretrained models in scale. Especially, if we need to handle a massive amount of data. So our ML model has to face this scalability issue to perform continiously and with faster response. To overcome this issue, one of the main big data paradigms that Spark has brought for us is the introduction of in-memory computing (it supports dis based operation, though) and caching abstraction.

This makes Spark ideal for large-scale data processing and enables the computing nodes to perform multiple operations by accessing the same input data across multiple nodes in a computing cluster or cloud computing infrastructures (example, Amazon AWS, DigitalOcean, Microsoft Azure, or Google Cloud). For doing so, Spark supports four cluster managers (the last one is still experimental, though):

  • Standalone: A simple cluster manager included with Spark that makes it easy to set up a cluster.
  • Apache Mesos: A general cluster manager that can also run Hadoop MapReduce and service applications.
  • Hadoop YARN: The resource manager in Hadoop 2.
  • Kubernetes (experimental): In addition to the above, there is experimental support for Kubernetes. Kubernetes is an open-source platform for providing container-centric infrastructure. See more at https://spark.apache.org/docs/latest/cluster-overview.html.

You can upload your input dataset on Hadoop Distributed File System (HDFS) or S3 storage for efficient computing and storing big data cheaply. Then the spark-submit script in Spark’s bin directory is used to launch applications on any of those cluster modes. It can use all of the cluster managers through a uniform interface so you don’t have to configure your application specially for each one.

However, if your code depends on other projects, you will need to package them alongside your application in order to distribute the code to a Spark cluster. To do this, create an assembly jar (also called fat or uber jar) containing your code and its dependencies. Then ship the code where the data resides and execute your Spark jobs. Both the SBT and Maven have assembly plugins that should help you to prepare the jars.

When creating assembly jars, list Spark and Hadoop as dependencies as well. These need not be bundled since they are provided by the cluster manager at runtime. Once you have an assembled jar, you can call the script by passing your jar as follows:

  ./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

In the preceding command, some of the commonly used options are listed down as follows:

  • --class: The entry point for your application (example, org.apache.spark.examples.SparkPi).
  • --master: The master URL for the cluster (example, spark://23.195.26.187:7077).
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client.
  • --conf: Arbitrary Spark configuration property in key=value format.
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any.

For example, you can run the AllstateClaimsSeverityRandomForestRegressor script on a Spark standalone cluster in client deploy mode as follows:

./bin/spark-submit \
--class com.packt.ScalaML.InsuranceSeverityClaim.AllstateClaimsSeverityRandomForestRegressor\
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar

For more info see Spark website at https://spark.apache.org/docs/latest/submitting-applications.html. Nevertheless, you can find useful information from online blogs or books. By the way, I discussed this topic in details in one of my recently published books: Md. Rezaul Karim, Sridhar Alla, Scala and Spark for Big Data Analytics, Packt Publishing Ltd. 2017. See more at https://www.packtpub.com/big-data-and-business-intelligence/scala-and-spark-big-data-analytics.

Anyway, we will learn more on deploying ML models in production in upcoming chapters. Therefore, that's all I have to write for this chapter. 

Summary

In this chapter, we have seen how to develop a predictive model for analyzing insurance severity claims using some of the most widely used regression algorithms. We started with simple LR. Then we saw how we can improve performance using a GBT regressor. Then we experienced improved performance using ensemble techniques, such as the Random Forest regressor. Finally, we looked at performance comparative analysis between these models and chose the best model to deploy for production-ready environment.

In the next chapter, we will look at a new end-to-end project called Analyzing and Predicting Telecommunication Churn. Churn prediction is essential for businesses as it helps you detect customers who are likely to cancel a subscription, product, or service. It also minimizes customer defection. It does so by predicting which customers are more likely to cancel a subscription to a service.

 

Left arrow icon Right arrow icon
Download code icon Download Code

Key benefits

  • Explore machine learning techniques with prominent open source Scala libraries such as Spark ML, H2O, MXNet, Zeppelin, and DeepLearning4j
  • Solve real-world machine learning problems by delving complex numerical computing with Scala functional programming in a scalable and faster way
  • Cover all key aspects such as collection, storing, processing, analyzing, and evaluation required to build and deploy machine models on computing clusters using Scala Play framework.

Description

Machine learning has had a huge impact on academia and industry by turning data into actionable information. Scala has seen a steady rise in adoption over the past few years, especially in the fields of data science and analytics. This book is for data scientists, data engineers, and deep learning enthusiasts who have a background in complex numerical computing and want to know more hands-on machine learning application development. If you're well versed in machine learning concepts and want to expand your knowledge by delving into the practical implementation of these concepts using the power of Scala, then this book is what you need! Through 11 end-to-end projects, you will be acquainted with popular machine learning libraries such as Spark ML, H2O, DeepLearning4j, and MXNet. At the end, you will be able to use numerical computing and functional programming to carry out complex numerical tasks to develop, build, and deploy research or commercial projects in a production-ready environment.

Who is this book for?

If you want to leverage the power of both Scala and Spark to make sense of Big Data, then this book is for you. If you are well versed with machine learning concepts and wants to expand your knowledge by delving into the practical implementation using the power of Scala, then this book is what you need! Strong understanding of Scala Programming language is recommended. Basic familiarity with machine Learning techniques will be more helpful.

What you will learn

  • Apply advanced regression techniques to boost the performance of predictive models
  • Use different classification algorithms for business analytics
  • Generate trading strategies for Bitcoin and stock trading using ensemble techniques
  • Train Deep Neural Networks (DNN) using H2O and Spark ML
  • Utilize NLP to build scalable machine learning models
  • Learn how to apply reinforcement learning algorithms such as Q-learning for developing ML application
  • Learn how to use autoencoders to develop a fraud detection application
  • Implement LSTM and CNN models using DeepLearning4j and MXNet

Product Details

Country selected
Publication date, Length, Edition, Language, ISBN-13
Publication date : Jan 31, 2018
Length: 470 pages
Edition : 1st
Language : English
ISBN-13 : 9781788479042
Vendor :
Apache
Category :
Languages :
Concepts :

What do you get with a Packt Subscription?

Free for first 7 days. $19.99 p/m after that. Cancel any time!
Product feature icon Unlimited ad-free access to the largest independent learning library in tech. Access this title and thousands more!
Product feature icon 50+ new titles added per month, including many first-to-market concepts and exclusive early access to books as they are being written.
Product feature icon Innovative learning tools, including AI book assistants, code context explainers, and text-to-speech.
Product feature icon Thousands of reference materials covering every tech concept you need to stay up to date.
Subscribe now
View plans & pricing

Product Details

Publication date : Jan 31, 2018
Length: 470 pages
Edition : 1st
Language : English
ISBN-13 : 9781788479042
Vendor :
Apache
Category :
Languages :
Concepts :

Packt Subscriptions

See our plans and pricing
Modal Close icon
€18.99 billed monthly
Feature tick icon Unlimited access to Packt's library of 7,000+ practical books and videos
Feature tick icon Constantly refreshed with 50+ new titles a month
Feature tick icon Exclusive Early access to books as they're written
Feature tick icon Solve problems while you work with advanced search and reference features
Feature tick icon Offline reading on the mobile app
Feature tick icon Simple pricing, no contract
€189.99 billed annually
Feature tick icon Unlimited access to Packt's library of 7,000+ practical books and videos
Feature tick icon Constantly refreshed with 50+ new titles a month
Feature tick icon Exclusive Early access to books as they're written
Feature tick icon Solve problems while you work with advanced search and reference features
Feature tick icon Offline reading on the mobile app
Feature tick icon Choose a DRM-free eBook or Video every month to keep
Feature tick icon PLUS own as many other DRM-free eBooks or Videos as you like for just €5 each
Feature tick icon Exclusive print discounts
€264.99 billed in 18 months
Feature tick icon Unlimited access to Packt's library of 7,000+ practical books and videos
Feature tick icon Constantly refreshed with 50+ new titles a month
Feature tick icon Exclusive Early access to books as they're written
Feature tick icon Solve problems while you work with advanced search and reference features
Feature tick icon Offline reading on the mobile app
Feature tick icon Choose a DRM-free eBook or Video every month to keep
Feature tick icon PLUS own as many other DRM-free eBooks or Videos as you like for just €5 each
Feature tick icon Exclusive print discounts

Frequently bought together


Stars icon
Total 128.97
Scala for Machine Learning, Second Edition
€49.99
Scala Machine Learning Projects
€36.99
Modern Scala Projects
€41.99
Total 128.97 Stars icon

Table of Contents

12 Chapters
Analyzing Insurance Severity Claims Chevron down icon Chevron up icon
Analyzing and Predicting Telecommunication Churn Chevron down icon Chevron up icon
High Frequency Bitcoin Price Prediction from Historical and Live Data Chevron down icon Chevron up icon
Population-Scale Clustering and Ethnicity Prediction Chevron down icon Chevron up icon
Topic Modeling - A Better Insight into Large-Scale Texts Chevron down icon Chevron up icon
Developing Model-based Movie Recommendation Engines Chevron down icon Chevron up icon
Options Trading Using Q-learning and Scala Play Framework Chevron down icon Chevron up icon
Clients Subscription Assessment for Bank Telemarketing using Deep Neural Networks Chevron down icon Chevron up icon
Fraud Analytics Using Autoencoders and Anomaly Detection Chevron down icon Chevron up icon
Human Activity Recognition using Recurrent Neural Networks Chevron down icon Chevron up icon
Image Classification using Convolutional Neural Networks Chevron down icon Chevron up icon
Other Books You May Enjoy Chevron down icon Chevron up icon
Get free access to Packt library with over 7500+ books and video courses for 7 days!
Start Free Trial

FAQs

What is included in a Packt subscription? Chevron down icon Chevron up icon

A subscription provides you with full access to view all Packt and licnesed content online, this includes exclusive access to Early Access titles. Depending on the tier chosen you can also earn credits and discounts to use for owning content

How can I cancel my subscription? Chevron down icon Chevron up icon

To cancel your subscription with us simply go to the account page - found in the top right of the page or at https://subscription.packtpub.com/my-account/subscription - From here you will see the ‘cancel subscription’ button in the grey box with your subscription information in.

What are credits? Chevron down icon Chevron up icon

Credits can be earned from reading 40 section of any title within the payment cycle - a month starting from the day of subscription payment. You also earn a Credit every month if you subscribe to our annual or 18 month plans. Credits can be used to buy books DRM free, the same way that you would pay for a book. Your credits can be found in the subscription homepage - subscription.packtpub.com - clicking on ‘the my’ library dropdown and selecting ‘credits’.

What happens if an Early Access Course is cancelled? Chevron down icon Chevron up icon

Projects are rarely cancelled, but sometimes it's unavoidable. If an Early Access course is cancelled or excessively delayed, you can exchange your purchase for another course. For further details, please contact us here.

Where can I send feedback about an Early Access title? Chevron down icon Chevron up icon

If you have any feedback about the product you're reading, or Early Access in general, then please fill out a contact form here and we'll make sure the feedback gets to the right team. 

Can I download the code files for Early Access titles? Chevron down icon Chevron up icon

We try to ensure that all books in Early Access have code available to use, download, and fork on GitHub. This helps us be more agile in the development of the book, and helps keep the often changing code base of new versions and new technologies as up to date as possible. Unfortunately, however, there will be rare cases when it is not possible for us to have downloadable code samples available until publication.

When we publish the book, the code files will also be available to download from the Packt website.

How accurate is the publication date? Chevron down icon Chevron up icon

The publication date is as accurate as we can be at any point in the project. Unfortunately, delays can happen. Often those delays are out of our control, such as changes to the technology code base or delays in the tech release. We do our best to give you an accurate estimate of the publication date at any given time, and as more chapters are delivered, the more accurate the delivery date will become.

How will I know when new chapters are ready? Chevron down icon Chevron up icon

We'll let you know every time there has been an update to a course that you've bought in Early Access. You'll get an email to let you know there has been a new chapter, or a change to a previous chapter. The new chapters are automatically added to your account, so you can also check back there any time you're ready and download or read them online.

I am a Packt subscriber, do I get Early Access? Chevron down icon Chevron up icon

Yes, all Early Access content is fully available through your subscription. You will need to have a paid for or active trial subscription in order to access all titles.

How is Early Access delivered? Chevron down icon Chevron up icon

Early Access is currently only available as a PDF or through our online reader. As we make changes or add new chapters, the files in your Packt account will be updated so you can download them again or view them online immediately.

How do I buy Early Access content? Chevron down icon Chevron up icon

Early Access is a way of us getting our content to you quicker, but the method of buying the Early Access course is still the same. Just find the course you want to buy, go through the check-out steps, and you’ll get a confirmation email from us with information and a link to the relevant Early Access courses.

What is Early Access? Chevron down icon Chevron up icon

Keeping up to date with the latest technology is difficult; new versions, new frameworks, new techniques. This feature gives you a head-start to our content, as it's being created. With Early Access you'll receive each chapter as it's written, and get regular updates throughout the product's development, as well as the final course as soon as it's ready.We created Early Access as a means of giving you the information you need, as soon as it's available. As we go through the process of developing a course, 99% of it can be ready but we can't publish until that last 1% falls in to place. Early Access helps to unlock the potential of our content early, to help you start your learning when you need it most. You not only get access to every chapter as it's delivered, edited, and updated, but you'll also get the finalized, DRM-free product to download in any format you want when it's published. As a member of Packt, you'll also be eligible for our exclusive offers, including a free course every day, and discounts on new and popular titles.