Recommendation systems can be defined as software applications that draw out and learn from data such as preferences, their actions (clicks, for example), browsing history, and generated recommendations, which are products that the system determines are appealing to the user in the immediate future. In this tutorial, we will learn to build a recommendation system with Scala and Apache Spark.
This article is an excerpt taken from Modern Scala Projects written Ilango Gurusamy.
The following diagram is representative of a typical recommendation system:
Recommendation system
In the preceding diagram, can be thought of as a recommendation ecosystem, where the recommendation system is at the heart of it. This system needs three entities:
Implementation is documented in the following subsections. All code is developed in an Intellij code editor. The very first step is to create an empty Scala project called Chapter7.
Let's create a Scala project called Chapter7 with the following artifacts:
Let's break down the project's structure:
At this point, we will start developing code in the IntelliJ code editor. We will start with the AirlineWrapper Scala file and end with the deployment of the final application JAR into Spark with spark-submit.
Let's create the trait definition. The trait will hold the SparkSession variable, schema definitions for the datasets, and methods to build a dataframe:
trait RecWrapper { }
Next, let's create a schema for past weapon sales orders.
Let's create a schema for the past sales order dataset:
val salesOrderSchema: StructType = StructType(Array( StructField("sCustomerId", IntegerType,false), StructField("sCustomerName", StringType,false), StructField("sItemId", IntegerType,true), StructField("sItemName", StringType,true), StructField("sItemUnitPrice",DoubleType,true), StructField("sOrderSize", DoubleType,true), StructField("sAmountPaid", DoubleType,true) ))
Next, let's create a schema for weapon sales leads.
Here is a schema definition for the weapon sales lead dataset:
val salesLeadSchema: StructType = StructType(Array( StructField("sCustomerId", IntegerType,false), StructField("sCustomerName", StringType,false), StructField("sItemId", IntegerType,true), StructField("sItemName", StringType,true) ))
Next, let's build a weapon sales order dataframe.
Let's invoke the read method on our SparkSession instance and cache it. We will call this method later from the RecSystem object:
def buildSalesOrders(dataSet: String): DataFrame = { session.read .format("com.databricks.spark.csv") .option("header", true).schema(salesOrderSchema).option("nullValue", "") .option("treatEmptyValuesAsNulls", "true") .load(dataSet).cache() }
Next up, let's build a sales leads dataframe:
def buildSalesLeads(dataSet: String): DataFrame = { session.read .format("com.databricks.spark.csv") .option("header", true).schema(salesLeadSchema).option("nullValue", "") .option("treatEmptyValuesAsNulls", "true") .load(dataSet).cache() }
This completes the trait. Overall, it looks like this:
trait RecWrapper {
1) Create a lazy SparkSession instance and call it session.
2) Create a schema for the past sales orders dataset
3) Create a schema for sales lead dataset
4) Write a method to create a dataframe that holds past sales order
data. This method takes in sales order dataset and
returns a dataframe
5) Write a method to create a dataframe that holds lead sales data
}
Bring in the following imports:
import org.apache.spark.mllib.recommendation.{ALS, Rating} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
Create a Scala object called RecSystem:
object RecSystem extends App with RecWrapper { }
Before going any further, bring in the following imports:
import org.apache.spark.rdd.RDD import org.apache.spark.sql.DataFrame
Inside this object, start by loading the past sales order data. This will be our training data. Load the sales order dataset, as follows:
val salesOrdersDf = buildSalesOrders("sales\\PastWeaponSalesOrders.csv")
Verify the schema. This is what the schema looks like:
salesOrdersDf.printSchema() root |-- sCustomerId: integer (nullable = true) |-- sCustomerName: string (nullable = true) |-- sItemId: integer (nullable = true) |-- sItemName: string (nullable = true) |-- sItemUnitPrice: double (nullable = true) |-- sOrderSize: double (nullable = true) |-- sAmountPaid: double (nullable = true)
Here is a partial view of a dataframe displaying past weapon sales order data:
Partial view of dataframe displaying past weapon sales order data
Now, we have what we need to create a dataframe of ratings:
val ratingsDf: DataFrame = salesOrdersDf.map( salesOrder => Rating( salesOrder.getInt(0), salesOrder.getInt(2), salesOrder.getDouble(6) ) ).toDF("user", "item", "rating")
Save all and compile the project at the command line:
C:\Path\To\Your\Project\Chapter7>sbt compile
You are likely to run into the following error:
[error] C:\Path\To\Your\Project\Chapter7\src\main\scala\com\packt\modern\chapter7\RecSystem.scala:50:50: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. [error] val ratingsDf: DataFrame = salesOrdersDf.map( salesOrder => [error] ^ [error] two errors found [error] (compile:compileIncremental) Compilation failed
To fix this, place the following statement at the top of the declarations of the rating dataframe. It should look like this:
import session.implicits._ val ratingsDf: DataFrame = salesOrdersDf.map( salesOrder => UserRating( salesOrder.getInt(0), salesOrder.getInt(2), salesOrder.getDouble(6) ) ).toDF("user", "item", "rating")
Save and recompile the project. This time, it compiles just fine. Next, import the Rating class from the org.apache.spark.mllib.recommendation package. This transforms the rating dataframe that we obtained previously to its RDD equivalent:
val ratings: RDD[Rating] = ratingsDf.rdd.map( row => Rating( row.getInt(0), row.getInt(1), row.getDouble(2) ) ) println("Ratings RDD is: " + ratings.take(10).mkString(" ") )
The following few lines of code are very important. We will be using the ALS algorithm from Spark MLlib to create and train a MatrixFactorizationModel, which takes an RDD[Rating] object as input. The ALS train method may require a combination of the following training hyperparameters:
We just explained the role played by each parameter needed by the ALS algorithm's train method.
Let's get started by bringing in the following imports:
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
Now, let's get down to training the matrix factorization model using the ALS algorithm.
Let's train a matrix factorization model given an RDD of ratings by customers (users) for certain items (products). Our train method on the ALS algorithm will take the following four parameters:
val ratingsModel: MatrixFactorizationModel = ALS.train(ratings, 6, /* THE RANK */ 10, /* Number of iterations */ 15.0 /* Lambda, or regularization parameter */ )
Next, we load the sales lead file and convert it into a tuple format:
val weaponSalesLeadDf = buildSalesLeads("sales\\ItemSalesLeads.csv")
In the next section, we will display the new weapon sales lead dataframe.
First, we must invoke the show method:
println("Weapons Sales Lead dataframe is: ") weaponSalesLeadDf.show
Here is a view of the weapon sales lead dataframe:
View of weapon sales lead dataframe
Next, create a version of the sales lead dataframe structured as (customer, item) tuples:
val customerWeaponsSystemPairDf: DataFrame = weaponSalesLeadDf.map(salesLead => ( salesLead.getInt(0), salesLead.getInt(2) )).toDF("user","item")
In the next section, let's display the dataframe that we just created.
Let's the show method, as follows:
println("The Customer-Weapons System dataframe as tuple pairs looks like: ") customerWeaponsSystemPairDf.show
Here is a screenshot of the new customer-weapons-system dataframe as tuple pairs:
New customer-weapons-system dataframe as tuple pairs
Next, we will convert the preceding dataframe into an RDD:
val customerWeaponsSystemPairRDD: RDD[(Int, Int)] = customerWeaponsSystemDf.rdd.map(row => (row.getInt(0), row.getInt(1)) ) /* Notes: As far as the algorithm is concerned, customer corresponds to "user" and "product" or item corresponds to a "weapons system" */
We previously created a MatrixFactorization model that we trained with the weapons system sales orders dataset. We are in a position to predict how each customer country may rate a weapon system in the future. In the next section, we will generate predictions.
Here is how we will generate predictions. The predict method of our model is designed to do just that. It will generate a predictions RDD that we call weaponRecs. It represents the ratings of weapons systems that were not rated by customer nations (listed in the past sales order data) previously:
val weaponRecs: RDD[Rating] = ratingsModel.predict(customerWeaponsSystemPairRDD).distinct()
Next up, we will display the final predictions.
Here is how to display the predictions, lined up in tabular format:
println("Future ratings are: " + weaponRecs.foreach(rating => { println( "Customer: " + rating.user + " Product: " + rating.product + " Rating: " + rating.rating ) } ) )
The following table displays how each nation is expected to rate a certain system in the future, that is, a weapon system that they did not rate earlier:
System rating by each nation
Our recommendation system proved itself capable of generating future predictions.
Up until now, we did not say how all of the preceding code is compiled and deployed. We will look at this in the next section.
Invoke the sbt compile project at the root folder of your Chapter7 project. You should get the following output:
Output on compiling the project
Besides loading build.sbt, the compile task is also loading settings from assembly.sbt which we will create below.
We have not yet talked about the assembly.sbt file. Our scala-based Spark application is a Spark job that will be submitted to a (local) Spark cluster as a JAR file. This file, apart from Spark libraries, also needs other dependencies in it for our recommendation system job to successfully complete. The name fat JAR is from all dependencies bundled in one JAR. To build such a fat JAR, we need an sbt-assembly plugin. This explains the need for creating a new assembly.sbt and the assembly plugin.
Create a new assembly.sbt in your IntelliJ project view and save it under your project folder, as follows:
Creating assembly.sbt
Paste the following contents into the newly created assembly.sbt (under the project folder). The output should look like this:
Output on placing contents of assembly.sbt
The sbt-assembly plugin, version 0.14.7, gives us the ability to run an sbt-assembly task. With that, we are one step closer to building a fat or Uber JAR. This action is documented in the next step.
Issue the sbt assembly command, as follows:
Running the sbt assembly command
This time, the assembly task loads the assembly-plugin in assembly.sbt. However, further assembly halts because of a common duplicate error. This error arises due to several duplicates, multiple copies of dependency files that need removal before the assembly task can successfully complete. To address this situation, build.sbt needs an upgrade.
The following lines of code need to be added in, as follows:
Code lines for upgrading the build.sbt file
To test the effect of your changes, save this and go to the command line to reissue the sbt assembly task.
Run the assembly task, as follows:
Rerunning the assembly task
This time, the settings in the assembly.sbt file are loaded. The task completes successfully. To verify, drill down to the target folder. If everything went well, you should see a fat JAR, as follows:
Output as a JAR file
Our JAR file under the target folder is the recommendation system application's JAR file that needs to be deployed into Spark. This is documented in the next step.
The spark-submit command is how we will deploy the application into Spark. Here are two formats for the spark-submit command. The first one is a long one which sets more parameters than the second one:
spark-submit --class "com.packt.modern.chapter7.RecSystem" --master local[2] --deploy-mode client --driver-memory 16g -num-executors 2 --executor-memory 2g --executor-cores 2 <path-to-jar>
Leaning on the preceding format, let's submit our Spark job, supplying various parameters to it:
Parameters for Spark
The different parameters are explained as follows:
Tabular explanation of parameters for Spark Job
We used Spark's support for recommendations to build a prediction model that generated recommendations and leveraged Spark's alternating least squares algorithm to implement our collaborative filtering recommendation system.
If you've enjoyed reading this post, do check out the book Modern Scala Projects
to gain insights into data that will help organizations have a strategic and competitive advantage.
How to Build a music recommendation system with PageRank Algorithm
Recommendation Systems
Building A Recommendation System with Azure