We will be using the Bitcoin historical price data from Kaggle. For the real-time data, Cryptocompare API will be used.
For training the ML algorithm, there is a Bitcoin Historical Price Data dataset available to the public on Kaggle (version 10). The dataset can be downloaded here. It has 1 minute OHLC data for BTC-USD pairs from several exchanges.
At the beginning of the project, for most of them, data was available from January 1, 2012 to May 31, 2017; but for the Bitstamp exchange, it's available until October 20, 2017 (as well as for Coinbase, but that dataset became available later):
Figure 1: The Bitcoin historical dataset on Kaggle
Note that you need to be a registered user and be logged in in order to download the file. The file that we are using is bitstampUSD_1-min_data_2012-01-01_to_2017-10-20.csv. Now, let us get the data we have. It has eight columns:
One of the most important parts of the data-science pipeline after data collection (which is in a sense outsourced; we use data collected by others) is data preprocessing—clearing a dataset and transforming it to suit our needs.
Stemming from our goal—predict the direction of price change—we might ask ourselves, does having an actual price in dollars help to achieve this? Historically, the price of Bitcoin was usually rising, so if we try to fit a linear regression, it will show further exponential growth (whether in the long run this will be true is yet to be seen).
One of the assumptions of this project is as follows: whether we are thinking about Bitcoin trading in November 2016 with a price of about $700, or trading in November 2017 with a price in the $6500-7000 range, patterns in how people trade are similar. Now, we have several other assumptions, as described in the following points:
Taking into account the goals of data preparation, Scala was chosen as an easy and interactive way to manipulate data:
val priceDataFileName: String = "bitstampUSD_1- min_data_2012-01-01_to_2017-10-20.csv"
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("Bitcoin Preprocessing")
.getOrCreate()
val data = spark.read.format("com.databricks.spark.csv").option("header", "true").load(priceDataFileName)
data.show(10)
>>>
println((data.count(), data.columns.size))
>>>
(3045857, 8)
In the preceding code, we load data from the file downloaded from Kaggle and look at what is inside. There are 3045857 rows in the dataset and 8 columns, described before. Then we create the Delta column, containing the difference between closing and opening prices (that is, to consider only that data where meaningful trading has started to occur):
val dataWithDelta = data.withColumn("Delta", data("Close") -
data("Open"))
The following code labels our data by assigning 1 to the rows the Delta value of which was positive; it assigns 0 otherwise:
import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._
val dataWithLabels = dataWithDelta.withColumn("label",
when($"Close" -
$"Open" > 0, 1).otherwise(0))
rollingWindow(dataWithLabels, 22, outputDataFilePath,
outputLabelFilePath)
This code transforms the original dataset into time series data. It takes the Delta values of WINDOW_SIZE rows (22 in this experiment) and makes a new row out of them. In this way, the first row has Delta values from t0 to t21, and the second one has values from t1 to t22. Then we create the corresponding array with labels (1 or 0).
Finally, we save X and Y into files where 612000 rows were cut off from the original dataset; 22 means rolling window size and 2 classes represents that labels are binary 0 and 1:
val dropFirstCount: Int = 612000
def rollingWindow(data: DataFrame, window: Int, xFilename: String,
yFilename: String): Unit = {
var i = 0
val xWriter = new BufferedWriter(new FileWriter(new File(xFilename)))
val yWriter = new BufferedWriter(new FileWriter(new File(yFilename)))
val zippedData = data.rdd.zipWithIndex().collect()
System.gc()
val dataStratified = zippedData.drop(dropFirstCount)//slice 612K
while (i < (dataStratified.length - window)) {
val x = dataStratified
.slice(i, i + window)
.map(r => r._1.getAs[Double]("Delta")).toList
val y = dataStratified.apply(i + window)._1.getAs[Integer]("label")
val stringToWrite = x.mkString(",")
xWriter.write(stringToWrite + "n")
yWriter.write(y + "n")
i += 1
if (i % 10 == 0) {
xWriter.flush()
yWriter.flush()
}
}
xWriter.close()
yWriter.close()
}
In the preceding code segment:
val outputDataFilePath: String = "output/scala_test_x.csv"
val outputLabelFilePath: String = "output/scala_test_y.csv"
For real-time data, the Cryptocompare API is used, more specifically HistoMinute, which gives us access to OHLC data for the past seven days at most. The details of the API will be discussed in a section devoted to implementation, but the API response is very similar to our historical dataset, and this data is retrieved using a regular HTTP request. For example, a simple JSON response has the following structure:
{
"Response":"Success", "Type":100, "Aggregated":false, "Data":
[{"time":1510774800,"close":7205,"high":7205,"low":7192.67,"open":7198,
"volumefrom":81.73,"volumeto":588726.94},
{"time":1510774860,"close":7209.05,"high":7219.91,"low":7205,"open":7205, "volumefrom":16.39,"volumeto":118136.61},
... (other price data)
], "TimeTo":1510776180,
"TimeFrom":1510774800,
"FirstValueInArray":true,
"ConversionType":{"type":"force_direct","conversionSymbol":""}
}
Through Cryptocompare HistoMinute, we can get open, high, low, close, volumefrom, and volumeto from each minute of historical data. This data is stored for 7 days only; if you need more, use the hourly or daily path. It uses BTC conversion if data is not available because the coin is not being traded in the specified currency:
Now, the following method fetches the correctly formed URL of the Cryptocompare API, which is a fully formed URL with all parameters, such as currency, limit, and aggregation specified. It finally returns the future that will have a response body parsed into the data model, with the price list to be processed at an upper level:
import javax.inject.Inject
import play.api.libs.json.{JsResult, Json}
import scala.concurrent.Future
import play.api.mvc._
import play.api.libs.ws._
import processing.model.CryptoCompareResponse
class RestClient @Inject() (ws: WSClient) {
def getPayload(url : String):
Future[JsResult[CryptoCompareResponse]] =
{
val request: WSRequest = ws.url(url)
val future = request.get()
implicit val context =
play.api.libs.concurrent.Execution.Implicits.defaultContext
future.map {
response => response.json.validate[CryptoCompareResponse]
}
}
}
In the preceding code segment, the CryptoCompareResponse class is the model of API, which takes the following parameters:
Now, it has the following signature:
case class CryptoCompareResponse(Response : String, Type : Int,
Aggregated : Boolean, Data :
List[OHLC], FirstValueInArray :
Boolean, TimeTo : Long,
TimeFrom: Long)
object CryptoCompareResponse {
implicit val cryptoCompareResponseReads = Json.reads[CryptoCompareResponse]
}
Again, the preceding two code segments the open-high-low-close (also known as OHLC), are a model class for mapping with CryptoAPI response data array internals. It takes these parameters:
Now, it has the following signature:
case class OHLC(time: Long,
open: Double,
high: Double,
low: Double,
close: Double,
volumefrom: Double,
volumeto: Double)
object OHLC {
implicit val implicitOHLCReads = Json.reads[OHLC]
}
Inside the project, in the package folder prediction.training, there is a Scala object called TrainGBT.scala. Before launching, you have to specify/change four things:
The code for training uses the Apache Spark ML library (and libraries required for it) to train the classifier, which means they have to be present in your class path to be able to run it. The easiest way to do that (since the whole project uses SBT) is to run it from the project root folder by typing sbtrun-main prediction.training.TrainGBT, which will resolve all dependencies and launch training.
Depending on the number of iterations and depth, it can take several hours to train the model. Now let us see how training is performed on the example of the gradient-boosted trees model. First, we need to create a SparkSession object:
val xSchema = StructType(Array(
StructField("t0", DoubleType, true),
StructField("t1", DoubleType, true),
StructField("t2", DoubleType, true),
StructField("t3", DoubleType, true),
StructField("t4", DoubleType, true),
StructField("t5", DoubleType, true),
StructField("t6", DoubleType, true),
StructField("t7", DoubleType, true),
StructField("t8", DoubleType, true),
StructField("t9", DoubleType, true),
StructField("t10", DoubleType, true),
StructField("t11", DoubleType, true),
StructField("t12", DoubleType, true),
StructField("t13", DoubleType, true),
StructField("t14", DoubleType, true),
StructField("t15", DoubleType, true),
StructField("t16", DoubleType, true),
StructField("t17", DoubleType, true),
StructField("t18", DoubleType, true),
StructField("t19", DoubleType, true),
StructField("t20", DoubleType, true),
StructField("t21", DoubleType, true))
)
Then we read the files we defined for the schema. It was more convenient to generate two separate files in Scala for data and labels, so here we have to join them into a single DataFrame:
Then we read the files we defined for the schema. It was more convenient to generate two separate files in Scala for data and labels, so here we have to join them into a single DataFrame:
import spark.implicits._
val y = y_tmp.withColumn("y", 'y.cast(IntegerType))
import org.apache.spark.sql.functions._
val x_id = x.withColumn("id", monotonically_increasing_id())
val y_id = y.withColumn("id", monotonically_increasing_id())
val data = x_id.join(y_id, "id")
The next step is required by Spark—we need to vectorize the features:
val featureAssembler = new VectorAssembler()
.setInputCols(Array("t0", "t1", "t2", "t3",
"t4", "t5", "t6", "t7",
"t8", "t9", "t10", "t11",
"t12", "t13", "t14", "t15",
"t16", "t17", "t18", "t19",
"t20", "t21"))
.setOutputCol("features")
We split the data into train and test sets randomly in the proportion of 75% to 25%. We set the seed so that the splits would be equal among all times we run the training:
val Array(trainingData,testData) = dataWithLabels.randomSplit(Array(0.75, 0.25), 123)
We then define the model. It tells which columns are features and which are labels. It also sets parameters:
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setSeed(123)
Create a pipeline of steps—vector assembling of features and running GBT:
val pipeline = new Pipeline()
.setStages(Array(featureAssembler, gbt))
Defining evaluator function—how the model knows whether it is doing well or not. As we have only two classes that are imbalanced, accuracy is a bad measurement; area under the ROC curve is better:
val rocEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
K-fold cross-validation is used to avoid overfitting; it takes out one-fifth of the data at each iteration, trains the model on the rest, and then tests on this one-fifth:
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(rocEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
.setSeed(123)
val cvModel = cv.fit(trainingData)
After we get the trained model (which can take an hour or more depending on the number of iterations and parameters we want to iterate on, specified in paramGrid), we then compute the predictions on the test data:
val predictions = cvModel.transform(testData)
In addition, evaluate quality of predictions:
val roc = rocEvaluator.evaluate(predictions)
The trained model is saved for later usage by the prediction service:
val gbtModel = cvModel.bestModel.asInstanceOf[PipelineModel]
gbtModel.save(rootDir + "__cv__gbt_22_binary_classes_" + System.nanoTime()/ 1000000 + ".model")
In summary, the code for model training is given as follows:
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.ml.{ Pipeline, PipelineModel }
import org.apache.spark.ml.classification.{ GBTClassificationModel,
GBTClassifier, RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,
MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer,
VectorAssembler, VectorIndexer}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField,
StructType}
import org.apache.spark.sql.SparkSession
object TrainGradientBoostedTree {
def main(args: Array[String]): Unit = {
val maxBins = Seq(5, 7, 9)
val numFolds = 10
val maxIter: Seq[Int] = Seq(10)
val maxDepth: Seq[Int] = Seq(20)
val rootDir = "output/"
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", ""/home/user/spark/")
.appName("Bitcoin Preprocessing")
.getOrCreate()
val xSchema = StructType(Array(
StructField("t0", DoubleType, true),
StructField("t1", DoubleType, true),
StructField("t2", DoubleType, true),
StructField("t3", DoubleType, true),
StructField("t4", DoubleType, true),
StructField("t5", DoubleType, true),
StructField("t6", DoubleType, true),
StructField("t7", DoubleType, true),
StructField("t8", DoubleType, true),
StructField("t9", DoubleType, true),
StructField("t10", DoubleType, true),
StructField("t11", DoubleType, true),
StructField("t12", DoubleType, true),
StructField("t13", DoubleType, true),
StructField("t14", DoubleType, true),
StructField("t15", DoubleType, true),
StructField("t16", DoubleType, true),
StructField("t17", DoubleType, true),
StructField("t18", DoubleType, true),
StructField("t19", DoubleType, true),
StructField("t20", DoubleType, true),
StructField("t21", DoubleType, true)))
val ySchema = StructType(Array(StructField("y", DoubleType,
true)))
val x = spark.read.format("csv").schema(xSchema).load(rootDir +
"scala_test_x.csv")
val y_tmp =
spark.read.format("csv").schema(ySchema).load(rootDir +
"scala_test_y.csv")
import spark.implicits._
val y = y_tmp.withColumn("y", 'y.cast(IntegerType))
import org.apache.spark.sql.functions._
//joining 2 separate datasets in single Spark dataframe
val x_id = x.withColumn("id", monotonically_increasing_id())
val y_id = y.withColumn("id", monotonically_increasing_id())
val data = x_id.join(y_id, "id")
val featureAssembler = new VectorAssembler()
.setInputCols(Array("t0", "t1", "t2", "t3", "t4", "t5",
"t6", "t7", "t8", "t9", "t10", "t11",
"t12", "t13", "t14", "t15", "t16",
"t17","t18", "t19", "t20", "t21"))
.setOutputCol("features")
val encodeLabel = udf[Double, String] { case "1" => 1.0 case
"0" => 0.0 }
val dataWithLabels = data.withColumn("label",
encodeLabel(data("y")))
//123 is seed number to get same datasplit so we can tune
params
val Array(trainingData, testData) =
dataWithLabels.randomSplit(Array(0.75, 0.25), 123)
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setSeed(123)
val pipeline = new Pipeline()
.setStages(Array(featureAssembler, gbt))
// ***********************************************************
println("Preparing K-fold Cross Validation and Grid Search")
// ***********************************************************
val paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxIter, maxIter)
.addGrid(gbt.maxDepth, maxDepth)
.addGrid(gbt.maxBins, maxBins)
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
.setSeed(123)
// ************************************************************
println("Training model with GradientBoostedTrees algorithm")
// ************************************************************
// Train model. This also runs the indexers.
val cvModel = cv.fit(trainingData)
cvModel.save(rootDir + "cvGBT_22_binary_classes_" +
System.nanoTime() / 1000000 + ".model")
println("Evaluating model on train and test data and
calculating RMSE")
//
**********************************************************************
// Make a sample prediction
val predictions = cvModel.transform(testData)
// Select (prediction, true label) and compute test error.
val rocEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val roc = rocEvaluator.evaluate(predictions)
val prEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderPR")
val pr = prEvaluator.evaluate(predictions)
val gbtModel = cvModel.bestModel.asInstanceOf[PipelineModel]
gbtModel.save(rootDir + "__cv__gbt_22_binary_classes_" +
System.nanoTime()/1000000 +".model")
println("Area under ROC curve = " + roc)
println("Area under PR curve= " + pr)
println(predictions.select().show(1))
spark.stop()
}
}
Now let us see how the training went:
>>>
Area under ROC curve = 0.6045355104779828
Area under PR curve= 0.3823834607704922
Therefore, we have not received very high accuracy, as the ROC is only 60.50% out of the best GBT model. Nevertheless, if we tune the hyperparameters, we will get better accuracy.
We learned how a complete ML pipeline can be implemented, from collecting historical data, to transforming it into a format suitable for testing hypotheses. We also performed machine learning model training to carry out predictions.
You read an excerpt from a book written by Md. Rezaul Karim, titled Scala Machine Learning Projects. In this book, you will learn to build powerful machine learning applications for performing advanced numerical computing and functional programming.