In this sub-section, we will develop a predictive analytics model for predicting accidental loss against the severity claim by clients. We start with importing required libraries:
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel}
import org.apache.spark.ml.{ Pipeline, PipelineModel }
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.tuning.ParamGridBuilder
import org.apache.spark.ml.tuning.CrossValidator
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.mllib.evaluation.RegressionMetrics
Then we create an active Spark session as the entry point to the application. In addition, importing implicits__ required for implicit conversions like converting RDDs to DataFrames.
val spark = SparkSessionCreate.createSession()
import spark.implicits._
Then we define some hyperparameters, such as the number of folds for cross-validation, the number of maximum iterations, the value of the regression parameter, the value of tolerance, and elastic network parameters, as follows:
val numFolds = 10
val MaxIter: Seq[Int] = Seq(1000)
val RegParam: Seq[Double] = Seq(0.001)
val Tol: Seq[Double] = Seq(1e-6)
val ElasticNetParam: Seq[Double] = Seq(0.001)
Well, now we create an LR estimator:
val model = new LinearRegression()
.setFeaturesCol("features")
.setLabelCol("label")
Now let's build a pipeline estimator by chaining the transformer and the LR estimator:
println("Building ML pipeline")
val pipeline = new Pipeline()
.setStages((Preproessing.stringIndexerStages
:+ Preproessing.assembler) :+ model)
Spark ML pipelines have the following components:
- DataFrame: Used as the central data store where all the original data and intermediate results are stored.
- Transformer: A transformer transforms one DataFrame into another by adding additional feature columns. Transformers are stateless, meaning that they don't have any internal memory and behave exactly the same each time they are used.
- Estimator: An estimator is some sort of ML model. In contrast to a transformer, an estimator contains an internal state representation and is highly dependent on the history of the data that it has already seen.
- Pipeline: Chains the preceding components, DataFrame, Transformer, and Estimator together.
- Parameter: ML algorithms have many knobs to tweak. These are called hyperparameters, and the values learned by a ML algorithm to fit data are called parameters.
Before we start performing the cross-validation, we need to have a paramgrid. So let's start creating the paramgrid by specifying the number of maximum iterations, the value of the regression parameter, the value of tolerance, and Elastic network parameters as follows:
val paramGrid = new ParamGridBuilder()
.addGrid(model.maxIter, MaxIter)
.addGrid(model.regParam, RegParam)
.addGrid(model.tol, Tol)
.addGrid(model.elasticNetParam, ElasticNetParam)
.build()
Now, for a better and stable performance, let's prepare the K-fold cross-validation and grid search as a part of model tuning. As you can probably guess, I am going to perform 10-fold cross-validation. Feel free to adjust the number of folds based on your settings and dataset:
println("Preparing K-fold Cross Validation and Grid Search: Model tuning")
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
Fantastic - we have created the cross-validation estimator. Now it's time to train the LR model:
println("Training model with Linear Regression algorithm")
val cvModel = cv.fit(Preproessing.trainingData)
Now that we have the fitted model, that means it is now capable of making predictions. So let's start evaluating the model on the train and validation set and calculating RMSE, MSE, MAE, R-squared, and many more:
println("Evaluating model on train and validation set and calculating RMSE")
val trainPredictionsAndLabels = cvModel.transform(Preproessing.trainingData)
.select("label", "prediction")
.map { case Row(label: Double, prediction: Double)
=> (label, prediction) }.rdd
val validPredictionsAndLabels = cvModel.transform(Preproessing.validationData)
.select("label", "prediction")
.map { case Row(label: Double, prediction: Double)
=> (label, prediction) }.rdd
val trainRegressionMetrics = new RegressionMetrics(trainPredictionsAndLabels)
val validRegressionMetrics = new RegressionMetrics(validPredictionsAndLabels)
Great! We have managed to compute the raw prediction on the train and the test set. Let's hunt for the best model:
val bestModel = cvModel.bestModel.asInstanceOf[PipelineModel]
Once we have the best fitted and cross-validated model, we can expect good prediction accuracy. Now let's observe the results on the train and the validation set:
val results = "n=====================================================================n" + s"Param trainSample: ${Preproessing.trainSample}n" +
s"Param testSample: ${Preproessing.testSample}n" +
s"TrainingData count: ${Preproessing.trainingData.count}n" +
s"ValidationData count: ${Preproessing.validationData.count}n" +
s"TestData count: ${Preproessing.testData.count}n" + "=====================================================================n" + s"Param maxIter = ${MaxIter.mkString(",")}n" +
s"Param numFolds = ${numFolds}n" + "=====================================================================n" + s"Training data MSE = ${trainRegressionMetrics.meanSquaredError}n" +
s"Training data RMSE = ${trainRegressionMetrics.rootMeanSquaredError}n" +
s"Training data R-squared = ${trainRegressionMetrics.r2}n" +
s"Training data MAE = ${trainRegressionMetrics.meanAbsoluteError}n" +
s"Training data Explained variance = ${trainRegressionMetrics.explainedVariance}n" + "=====================================================================n" + s"Validation data MSE = ${validRegressionMetrics.meanSquaredError}n" +
s"Validation data RMSE = ${validRegressionMetrics.rootMeanSquaredError}n" +
s"Validation data R-squared = ${validRegressionMetrics.r2}n" +
s"Validation data MAE = ${validRegressionMetrics.meanAbsoluteError}n" +
s"Validation data Explained variance = ${validRegressionMetrics.explainedVariance}n" +
s"CV params explained: ${cvModel.explainParams}n" +
s"LR params explained: ${bestModel.stages.last.asInstanceOf[LinearRegressionModel].explainParams}n" + "=====================================================================n"
Now, we print the preceding results as follows:
println(results)
>>>
Building Machine Learning pipeline
Reading data from data/insurance_train.csv file
Null values exist in the DataFrame
Training model with Linear Regression algorithm
=====================================================================
Param trainSample: 1.0
Param testSample: 1.0
TrainingData count: 141194
ValidationData count: 47124
TestData count: 125546
=====================================================================
Param maxIter = 1000
Param numFolds = 10
=====================================================================
Training data MSE = 4460667.3666198505
Training data RMSE = 2112.0292059107164
Training data R-squared = -0.1514435541595276
Training data MAE = 1356.9375609756164
Training data Explained variance = 8336528.638733305
=====================================================================
Validation data MSE = 4839128.978963534
Validation data RMSE = 2199.802031766389
Validation data R-squared = -0.24922962724089603
Validation data MAE = 1356.419484419514
Validation data Explained variance = 8724661.329105612
CV params explained: estimator: estimator for selection (current: pipeline_d5024480c670)
estimatorParamMaps: param maps for the estimator (current: [Lorg.apache.spark.ml.param.ParamMap;@2f0c9855)
evaluator: evaluator used to select hyper-parameters that maximize the validated metric (current: regEval_00c707fcaa06)
numFolds: number of folds for cross validation (>= 2) (default: 3, current: 10)
seed: random seed (default: -1191137437)
LR params explained: aggregationDepth: suggested depth for treeAggregate (>= 2) (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0, current: 0.001)
featuresCol: features column name (default: features, current: features)
fitIntercept: whether to fit an intercept term (default: true)
labelCol: label column name (default: label, current: label)
maxIter: maximum number of iterations (>= 0) (default: 100, current: 1000)
predictionCol: prediction column name (default: prediction)
regParam: regularization parameter (>= 0) (default: 0.0, current: 0.001)
solver: the solver algorithm for optimization. If this is not set or empty, default value is 'auto' (default: auto)
standardization: whether to standardize the training features before fitting the model (default: true)
tol: the convergence tolerance for iterative algorithms (>= 0) (default: 1.0E-6, current: 1.0E-6)
weightCol: weight column name. If this is not set or empty, we treat all instance weights as 1.0 (undefined)
=====================================================================
So our predictive model shows an MAE of about 1356.419484419514 for both the training and test set. However, the MAE is much lower on the Kaggle public and private leaderboard (go to: https://www.kaggle.com/c/allstate-claims-severity/leaderboard) with an MAE of 1096.92532 and 1109.70772 respectively.
Wait! We are not done yet. We still need to make a prediction on the test set:
println("Run prediction on the test set")
cvModel.transform(Preproessing.testData)
.select("id", "prediction")
.withColumnRenamed("prediction", "loss")
.coalesce(1) // to get all the predictions in a single csv file
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("output/result_LR.csv")
The preceding code should generate a CSV file named result_LR.csv. If we open the file, we should observe the loss against each ID, that is, claim. We will see the contents for both LR, RF, and GBT at the end of this chapter. Nevertheless, it is always a good idea to stop the Spark session by invoking the spark.stop() method.
An ensemble method is a learning algorithm that creates a model that is composed of a set of other base models. Spark ML supports two major ensemble algorithms called GBT and random forest based on decision trees. We will now see if we can improve the prediction accuracy by reducing the MAE error significantly using GBT.