In this sub-section, we will see some basic feature engineering and dataset preparation that can be fed into the MLP classifier. So let's start by creating SparkSession, which is the gateway to access Spark:
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", "/tmp/spark")
.appName("SurvivalPredictionMLP")
.getOrCreate();
Then let's read the training set and see a glimpse of it:
Dataset<Row> df = spark.sqlContext()
.read()
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("data/train.csv");
df.show();
A snapshot of the dataset can be seen as follows:
A snapshot of the Titanic survival dataset
Now we can see that the training set has both categorical as well as numerical features. In addition, some features are not important, such as PassengerID, Ticket, and so on. The same also applies to the Name feature unless we manually create some features based on the title. However, let's keep it simple. Nevertheless, some columns contain null values. Therefore, lots of consideration and cleaning are required.
I ignore the PassengerId, Name, and Ticket columns. Apart from these, the Sex column is categorical, so I've encoded the passengers based on male and female. Then the Embarked column is encoded too. We can encode S as 0, C as 1, and Q as 2.
For this also, we can write user-defined-functions (also known as UDFs) called normSex and normEmbarked for Sex and Embarked, respectively. Let's see their signatures:
private static UDF1<String,Option<Integer>> normEmbarked=(String d) -> {
if (null == d)
return Option.apply(null);
else {
if (d.equals("S"))
return Some.apply(0);
else if (d.equals("C"))
return Some.apply(1);
else
return Some.apply(2);
}
};
Therefore, this UDF takes a String type and encodes as an integer. Now the normSex UDF also works similarly:
private static UDF1<String, Option<Integer>> normSex = (String d) -> {
if (null == d)
return Option.apply(null);
else {
if (d.equals("male"))
return Some.apply(0);
else
return Some.apply(1);
}
};
So we can now select only useful columns but for the Sex and Embarked columns with the aforementioned UDFs:
Dataset<Row> projection = df.select(
col("Survived"),
col("Fare"),
callUDF("normSex", col("Sex")).alias("Sex"),
col("Age"),
col("Pclass"),
col("Parch"),
col("SibSp"),
callUDF("normEmbarked",
col("Embarked")).alias("Embarked"));
projectin.show();
Now we have been able to convert a categorical column into a numeric; however, as we can see, there are still null values. Therefore, what can we do? We can either drop the null values altogether or apply some null imputing techniques with the mean value of those particular columns. I believe the second approach is better.
Now, again for this null imputation, we can write UDFs too. However, for that we need to know some statistics about those numerical columns. Unfortunately, we cannot perform the summary statistics on DataFrame. Therefore, we have to convert the DataFrame into JavaRDD<Vector>. Well, we also ignore the null entries for calculating this:
JavaRDD<Vector> statsDf =projection.rdd().toJavaRDD().map(row -> Vectors.dense( row.<Double>getAs("Fare"),
row.isNullAt(3) ? 0d : row.Double>getAs("Age")
));
Now let's compute the multivariate statistical summary. The summary statistical will be further used to calculate the meanAge and meanFare for the corresponding missing entries for these two features:
MultivariateStatisticalSummary summary = Statistics.colStats(statsRDD.rdd());
double meanFare = summary.mean().apply(0);
double meanAge = summary.mean().apply(1);
Now let's create two more UDFs for the null imputation on the Age and Fare columns:
UDF1<String, Option<Double>> normFare = (String d) -> {
if (null == d) {
return Some.apply(meanFare);
}
else
return Some.apply(Double.parseDouble(d));
};
Therefore, we have defined a UDF, which fills in the meanFare values if the data has no entry. Now let's create another UDF for the Age column:
UDF1<String, Option<Double>> normAge = (String d) -> {
if (null == d)
return Some.apply(meanAge);
else
return Some.apply(Double.parseDouble(d));
};
Now we need to register the UDFs as follows:
spark.sqlContext().udf().register("normFare", normFare, DataTypes.DoubleType);
spark.sqlContext().udf().register("normAge", normAge, DataTypes.DoubleType);
Therefore, let's apply the preceding UDFs for null imputation:
Dataset<Row> finalDF = projection.select(
col("Survived"),
callUDF("normFare",
col("Fare").cast("string")).alias("Fare"),
col("Sex"),
callUDF("normAge",
col("Age").cast("string")).alias("Age"),
col("Pclass"),
col("Parch"),
col("SibSp"),
col("Embarked"));
finalDF.show();
Great! We now can see that the null values are replaced with the mean value for the Age and Fare columns. However, still the numeric values are not scaled. Therefore, it would be a better idea to scale them. However, for that, we need to compute the mean and variance and then store them as a model to be used for later scaling:
Vector stddev = Vectors.dense(Math.sqrt(summary.variance().apply(0)), Math.sqrt(summary.variance().apply(1)));
Vector mean = Vectors.dense(summary.mean().apply(0), summary.mean().apply(1));
StandardScalerModel scaler = new StandardScalerModel(stddev, mean);
Then we need an encoder for the numeric values (that is, Integer; either BINARY or Double):
Encoder<Integer> integerEncoder = Encoders.INT();
Encoder<Double> doubleEncoder = Encoders.DOUBLE();
Encoders.BINARY();
Encoder<Vector> vectorEncoder = Encoders.kryo(Vector.class);
Encoders.tuple(integerEncoder, vectorEncoder);
Encoders.tuple(doubleEncoder, vectorEncoder);
Then we can create a VectorPair consisting of the label (that is, Survived) and the features. Here the encoding is, basically, creating a scaled feature vector:
JavaRDD<VectorPair> scaledRDD = trainingDF.toJavaRDD().map(row -> {
VectorPair vectorPair = new VectorPair();
vectorPair.setLable(new
Double(row.<Integer> getAs("Survived")));
vectorPair.setFeatures(Util.getScaledVector(
row.<Double>getAs("Fare"),
row.<Double>getAs("Age"),
row.<Integer>getAs("Pclass"),
row.<Integer>getAs("Sex"),
row.isNullAt(7) ? 0d :
row.<Integer>getAs("Embarked"),
scaler));
return vectorPair;
});
In the preceding code block, the getScaledVector() method does perform the scaling operation. The signature of this method can be seen as follows:
public static org.apache.spark.mllib.linalg.Vector getScaledVector(double fare,
double age, double pclass, double sex, double embarked, StandardScalerModel scaler) {
org.apache.spark.mllib.linalg.Vector scaledContinous = scaler.transform(Vectors.dense(fare, age));
Tuple3<Double, Double, Double> pclassFlat = flattenPclass(pclass);
Tuple3<Double, Double, Double> embarkedFlat = flattenEmbarked(embarked);
Tuple2<Double, Double> sexFlat = flattenSex(sex);
return Vectors.dense(
scaledContinous.apply(0),
scaledContinous.apply(1),
sexFlat._1(),
sexFlat._2(),
pclassFlat._1(),
pclassFlat._2(),
pclassFlat._3(),
embarkedFlat._1(),
embarkedFlat._2(),
embarkedFlat._3());
}
Since we planned to use a Spark ML-based classifier (that is, an MLP implementation), we need to convert this RDD of the vector to an ML vector:
Dataset<Row> scaledDF = spark.createDataFrame(scaledRDD, VectorPair.class);
Finally, let's see how the resulting DataFrame looks:
scaledDF.show();
Up to this point, we have been able to prepare our features. Still, this is an MLlib-based vector, so we need to further convert this into an ML vector:
Dataset<Row> scaledData2 = MLUtils.convertVectorColumnsToML(scaledDF);
Fantastic! Now were' almost done preparing a training set that can be consumed by the MLP classifier. Since we also need to evaluate the model's performance, we can randomly split the training data for the training and test sets. Let's allocate 80% for training and 20% for testing. These will be used to train the model and evaluate the model, respectively:
Dataset<Row> data = scaledData2.toDF("features", "label");
Dataset<Row>[] datasets = data.randomSplit(new double[]{0.80, 0.20}, 12345L);
Dataset<Row> trainingData = datasets[0];
Dataset<Row> validationData = datasets[1];
Alright. Now that we have the training set, we can perform training on an MLP model.