When you get down to the nuts and bolts of building your solution, there are so many options for tools, tech, and approaches that it can be very easy to be overwhelmed. However, as alluded to in the previous sections, a lot of this complexity can be abstracted to understand the bigger picture via some back-of-the-envelope architecture and designs. This is always a useful exercise once you know what problem you are going to try and solve, and something I recommend doing before you make any detailed choices about implementation.
To give you an idea of how this works in practice, what follows are a few worked-through examples where a team has to create a high-level ML systems design for some typical business problems. These problems are similar to ones I have encountered before and will likely be similar to ones you will encounter in your own work.
Example 1: Batch anomaly detection service
You work for a tech-savvy taxi ride company with a fleet of thousands of cars. The organization wants to start making ride times more consistent and to understand longer journeys in order to improve customer experience and thereby increase retention and return business. Your ML team is employed to create an anomaly detection service to find rides that have unusual ride time or ride length behaviors. You all get to work, and your data scientists find that if you perform clustering on sets of rides using the features of ride distance and time, you can clearly identify outliers worth investigating by the operations team. The data scientists present the findings to the CTO and other stakeholders before getting the go-ahead to develop this into a service that will provide an outlier flag as a new field in one of the main tables of the company's internal analysis tool.
In this example, we will simulate some data to show how the taxi company's data scientists could proceed. All the code is contained in the Chapter1/batch-anomaly
folder in the repository for this book: https://github.com/PacktPublishing/Machine-Learning-Engineering-with-Python/tree/main/Chapter01. This will be true of all code snippets shown in this book:
- First, let's define a function that will simulate some ride distances based on the random distribution given in
numpy
and return a numpy
array containing the results. The reason for the repeated lines are so that we can create some base behavior and anomalies in the data, and you can clearly compare against the speeds we will generate for each set of taxis in the next step:def simulate_ride_distances():
ride_dists = np.concatenate(
(
10 * np.random.random(size=370),
30 * np.random.random(size=10),
10 * np.random.random(size=10),
10 * np.random.random(size=10)
)
)
return ride_dists
- We can now do the exact same thing for speeds, and again we have split the taxis into sets of 370, 10, 10, and 10 so that we can create some data with 'typical' behavior and some sets of anomalies, while allowing for clear matching of the values with the
distances
function:def simulate_ride_speeds():
ride_speeds = np.concatenate(
(
np.random.normal(loc=30, scale=5, size=370),
np.random.normal(loc=30, scale=5, size=10),
np.random.normal(loc=50, scale=10, size=10),
np.random.normal(loc=15, scale=4, size=10)
)
)
return ride_speeds
- We can now use both of these helper functions inside a function that will call these and bring them together to create a simulated dataset containing ride IDs, speeds, distances, and times. The result is returned as a
pandas
DataFrame for use in modeling:def simulate_ride_data():
ride_dists = simulate_ride_distances()
ride_speeds = simulate_ride_speeds()
ride_times = ride_dists/ride_speeds
# Assemble into Data Frame
df = pd.DataFrame(
{
'ride_dist': ride_dists,
'ride_time': ride_times,
'ride_speed': ride_speeds
}
)
ride_ids = datetime.datetime.now().strftime("%Y%m%d")+df.index.astype(str)
df['ride_id'] = ride_ids
return df
We can then run the simulation in lieu of getting the data from the taxi firm's system:
df = simulate_ride_data()
- Now, we get to the core of what data scientists produce in their projects, which is a simple function that wraps some
sklearn
code for returning a dictionary with the clustering run metadata and results. We include the relevant imports here for ease:from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from sklearn import metrics
def cluster_and_label(data, create_and_show_plot=True):
data = StandardScaler().fit_transform(data)
db = DBSCAN(eps=0.3, min_samples=10).fit(data)
core_samples_mask = np.zeros_like(db.labels_, dtype=bool)
core_samples_mask[db.core_sample_indices_] = True
labels = db.labels_
n_clusters_ = len(set(labels)) - (1 if -1 in labels else 0)
n_noise_ = list(labels).count(-1)
run_metadata = {
'nClusters': n_clusters_,
'nNoise': n_noise_,
'silhouetteCoefficient': metrics.silhouette_score(data, labels),
'labels': labels,
}
return run_metadata
Finally, if we use the results of the simulation from Step 4 and apply the machine learning code, we can get the original taxi dataset with a set of labels telling us whether the taxi ride was anomalous ('-1') or not ('0'):
X = df[['ride_dist', 'ride_time']]
results = cluster_and_label(X, create_and_show_plot=False)
df['label'] = results['labels']
Then, if you plot the results, with outliers labeled as black triangles, then you get something like Figure 1.5:
Figure 1.5 – An example set of results from performing clustering on some taxi ride data
Now that you have a basic model that works, you have to start thinking about how to pull this into an engineered solution – how could you do it?
Well, since the solution here is going to support longer-running investigations by another team, there is no need for a very low-latency solution. The stakeholders agree that the insights from clustering can be delivered at the end of each day. Working with the data-science part of the team, the ML engineers (led by you) understand that if clustering is run daily, this provides enough data to give appropriate clusters, but doing the runs any more frequently could lead to poorer results due to smaller amounts of data. So, a daily batch process is agreed upon.
What do you do next? Well, you know the frequency of runs is daily, but the volume of data is still very high, so it makes sense to leverage a distributed computing paradigm. Therefore, you decide to use Apache Spark. You know that the end consumer of the data is a table in a SQL database, so you need to work with the database team to design an appropriate handover of the results. Due to security and reliability concerns, it is not a good idea to write to the production database directly. You therefore agree that another database in the cloud will be used as an intermediate staging area for the data, which the main database can query against on its daily builds.
It might not seem like we have done anything technical here, but actually, you have already performed the high-level system design for your project. The rest of this book tells you how to fill in the gaps in the following diagram!
Figure 1.6 – Example 1 workflow
Let's now move on to the next example!
Example 2: Forecasting API
In this example, you are working for the logistics arm of a large retail chain. To maximize the flow of goods, the company would like to help regional logistics planners get ahead of particularly busy periods and to avoid product sell-outs. After discussions with stakeholders and subject matter experts across the business, it is agreed that the ability for planners to dynamically request and explore forecasts for particular warehouse items through a web-hosted dashboard is optimal. This allows the planners to understand likely future demand profiles before they make orders.
The data scientists come good again and find that the data has very predictable behavior at the level of any individual store. They decide to use the Facebook Prophet library for their modeling to help speed up the process of training many different models.
This example will use the open Rossman stores dataset from Kaggle, which can be found here: https://www.kaggle.com/pratyushakar/rossmann-store-sales:
- First, we read in the data from the folder where we have extracted the data. We will perform all the following steps on the
train
dataset provided in the download but treat this as an entire dataset that we wish to split into training and test sets anyway:df = pd.read_csv('./data/rossman/train.csv')
- Secondly, the data scientists prepped an initial subset of the data to work with first, so we will do the same. We do some basic tidy up, but the key points are that we select data for store number four in the dataset and only for when it is open:
df['Date'] = pd.to_datetime(df['Date'])
df.rename(columns= {'Date': 'ds', 'Sales': 'y'}, inplace=True)
df_store = df[
(df['Store']==4) &\
(df['Open']==1)
].reset_index(drop=True)
df_store = df_store.sort_values('ds', ascending=True)
- The data scientists then developed a little function that will take some supplied data, an index to delineate the size of the training set, and some seasonality parameters before returning a Prophet model trained on the training set:
from fbprophet import Prophet
def train_predict(df, train_index, seasonality=seasonality):
# grab split data
df_train = df.copy().iloc[0:train_index]
df_test = df.copy().iloc[train_index:]
model=Prophet(
yearly_seasonality=seasonality['yearly'],
weekly_seasonality=seasonality['weekly'],
daily_seasonality=seasonality['daily'],
interval_width = 0.95
)
# train and predict
model.fit(df_train)
predicted = model.predict(df_test)
return predicted, df_train, df_test
- Before applying this function, we can define the relevant seasonality settings in a dictionary:
seasonality = {
'yearly': True,
'weekly': True,
'daily': False
}
- Finally, we can apply the function as the data scientists envisaged:
train_index = int(0.8*df_store1.shape[0])
predicted, df_train, df_test = train_predict(
df = df_store,
train_index = train_index,
Seasonality = seasonality
)
Running this model and plotting the predicted values against the ground truth gives a plot like that in Figure 1.7:
Figure 1.7 – Forecasting store sales
One issue here is that implementing a forecasting model like the one above for every store can quickly lead to hundreds or even thousands of models if the chain gathers enough data. Another issue is that not all stores are on the resource planning system used at the company yet, so some planners would like to retrieve forecasts for other stores they know are similar to their own. It is agreed that if users like this can explore regional profiles they believe are similar with their own data, then they can still make the optimal decisions.
Given this and the customer requirements for dynamic, ad hoc requests, you quickly rule out a full batch process. This wouldn't cover the use case for regions not on the core system and wouldn't allow for dynamic retrieval of up-to-date forecasts via the website, which would allow you to deploy models that forecast at a variety of time horizons in the future. It also means you could save on compute as you don't need to manage the storage and updating of thousands of forecasts every day and your resources can be focused on model training.
Therefore, you decide that actually, a web-hosted API with an endpoint that can return forecasts as needed by the user makes the most sense. To give efficient responses, you have to consider what happens in a typical user session. By workshopping with the potential users of the dashboard, you quickly realize that although the requests are dynamic, most planners will focus on particular items of interest in any one session. They will also not look at many regions. This helps you to design a data, forecast, and model caching strategy that means that after the user makes their first selections, results can be returned more quickly for a better user experience. This leads to the rough system sketch in Figure 1.8:
Figure 1.8 – Example 2 workflow
Next, let's look at the final example.
Example 3: Streamed classification
In this final example, you are working for a web-based company that wants to classify users based on their usage patterns as targets for different types of advertising, in order to more effectively target marketing spend. For example, if the user uses the site less frequently, we may want to entice them with more aggressive discounts. One of the key requirements from the business is that the end results become part of the data landed in a data store used by other applications.
Based on these requirements, your team determines that a streaming application is the simplest solution that ticks all the boxes. The data engineers focus their efforts on building the streaming and data store infrastructure, while the ML engineer works to wrap up the classification model the data science team has trained on historical data. The base algorithm that the data scientists settle on is implemented in sklearn
, which we will work through below by applying it to a marketing dataset that would be similar to that produced in this use case.
This hypothetical example aligns with a lot of classic datasets, including the Bank Marketing dataset from the UCI Machine Learning Repository, https://archive.ics.uci.edu/ml/datasets/Bank+Marketing#. The following example code uses this dataset. Remember that all of the following code is available in the book's GitHub repository as in the other examples:
- First, we will read in the data, which is stored in a folder labeled
data
in the same directory as the script we are building:import pandas as pd
df = pd.read_csv('./data/bank/bank.csv', delimiter=';', decimal=',')
- Next, we define the features we would like to use in our model and define our feature matrix,
X
, and target variable vector, y
. The target variable will be translated to a numerical value, 1
, if the customer went with the proposed product, and 0
if they did not. Note that we assume the features have been selected in this case via robust exploratory data analysis, which is not covered here:cat_feature_cols = ["marital", "education", "contact", "default", "housing", "loan", "poutcome"]
num_feature_cols = ["age", "pdays", "previous", "emp.var.rate", "euribor3m", "nr.employed"]
feature_cols = cat_feature_cols + num_feature_cols
X = df[feature_cols].copy()
y = df['y'].apply(lambda x: 1 if x == 'yes' else 0).copy()
- Before moving on to modeling, we split the data into an 80/20 training and test split:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
- We then perform some very basic feature engineering and preparation by one-hot encoding all of the categorical variables, being careful to only train the transformer on the training set:
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore')
X_train_cat_encoded = enc.fit_transform(X_train[cat_feature_cols])
X_test_cat_encoded = enc.transform(X_test[cat_feature_cols])
- We then standardize the numerical variables in a similar way:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_train_num_scaled = scaler.fit_transform(X_train[num_feature_cols])
X_test_num_scaled = scaler.transform(X_test[num_feature_cols])
- We then have to bring the numerical and categorical data together into one set:
X_train = np.concatenate((X_train_cat_encoded.toarray(), X_train_num_scaled), axis=1)
X_test = np.concatenate((X_test_cat_encoded.toarray(), X_test_num_scaled), axis=1)
- Now we are ready to get ready for modeling. The dataset has imbalanced classes, so the data scientists have suggested that we use the SMOTE algorithm, which is contained within the
imblearn
package to perform oversampling of the minority class. This creates a balanced classification dataset:from imblearn.over_sampling import SMOTE
sm = SMOTE()
X_balanced, y_balanced = sm.fit_sample(X_train, y_train)
- The core code that the data scientists created can now be applied. They come up with a series of different variants of code based around a simple random forest classification model:
from sklearn.model_selection import KFold
from sklearn.model_selection import cross_val_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import f1_score
# Define classifier
rfc = RandomForestClassifier(n_estimators=1000)
rfc.fit(X_balanced, y_balanced)
When you run this code, you will find that the model performance could be improved. This, along with the need to streamline the preceding code, improve model scalability, and build a solution that can interact with the streaming pipeline, will be the focus of the ML engineer's work for this project. There will also be some subtleties around how often you want to retrain your algorithm to make sure that the classifier does not go stale. We will discuss all of these topics later in this book. Taken together, the outline of the processing steps needed in the solution gives a high-level system design like that in Figure 1.9:
Figure 1.9 – Example 3 workflow
We have now explored three high-level ML system designs and discussed the rationale behind our workflow choices. We have also explored in detail the sort of code that would often be produced by data scientists working on modeling, but which would act as input to future ML engineering work. This section should therefore have given us an appreciation of where our engineering work begins in a typical project and what types of problems we will be aiming to solve. And there you go. You are already on your way to becoming an ML engineer!