Real-time data formats and importing an example dataset in Python
To finalize this chapter, let's have a look at how to represent streaming data in practice. After all, when building analytics, we will often have to implement test cases and example datasets.
The simplest way to represent streaming data in Python would be to create an iterable object that contains the data and to build your analytics function to work with an iterable.
The following code creates a DataFrame using pandas. There are two columns, temperature and pH:
Code block 1-1
import pandas as pd
data_batch = pd.DataFrame({
'temperature': [10, 11, 10, 11, 12, 11, 10, 9, 10, 11, 12, 11, 9, 12, 11],
‹pH›: [5, 5.5, 6, 5, 4.5, 5, 4.5, 5, 4.5, 5, 4, 4.5, 5, 4.5, 6]
})
print(data_batch)
When showing the DataFrame, it will look as follows. The pH is around 4.5/5 but is sometimes higher. The temperature is generally around 10 or 11.
This dataset is a batch dataset; after all, you have all the rows (observations) at the same time. Now, let's see how to convert this dataset to a streaming dataset by making it iterable.
You can do this by iterating through the data's rows. When doing this, you set up a code structure that allows you to add more building blocks to this code one by one. When your developments are done, you will be able to use your code on a real-time stream rather than on an iteration of a DataFrame.
The following code iterates through the rows of the DataFrame and converts the rows to JSON format. This is a very common format for communication between different systems. The JSON of the observation contains a value for temperature and a value for pH. Those are printed out as follows:
Code block 1-2
data_iterable = data_batch.iterrows()
for i,new_datapoint in data_iterable:
print(new_datapoint.to_json())
After running this code, you should obtain a print output that looks like the following:
Let's now define a super simple example of streaming data analytics. The function that is defined in the following code block will print an alert whenever the temperature gets below 10:
Code block 1-3
def super_simple_alert(datapoint):
if datapoint[‹temperature›] < 10:
print('this is a real time alert. temp too low')
You can now add this alert into your simulated streaming process simply by calling the alerting test at every data point. You can use the following code to do this:
Code block 1-4
data_iterable = data_batch.iterrows()
for i,new_datapoint in data_iterable:
print(new_datapoint.to_json())
super_simple_alert(new_datapoint)
When executing this code, you will notice that alerts will be given as soon as the temperature goes below 10:
This alert works only on the temperature, but you could easily add the same type of alert on pH. The following code shows how this can be done. The alert function could be updated to include a second business rule as follows:
Code block 1-5
def super_simple_alert(datapoint):
if datapoint[‹temperature›] < 10:
print('this is a real time alert. temp too low')
if datapoint[‹pH›] > 5.5:
print('this is a real time alert. pH too high')
Executing the function would still be done in exactly the same way:
Code block 1-6
data_iterable = data_batch.iterrows()
for i,new_datapoint in data_iterable:
print(new_datapoint.to_json())
super_simple_alert(new_datapoint)
You will see several alerts being raised throughout the execution on the example streaming data, as follows:
With streaming data, you have to decide without seeing the complete data but just on those data points that have been received in the past. This means that there is a need for a different approach to redeveloping algorithms that are similar to batch processing algorithms.
Throughout this book, you will discover methods that apply to streaming data. The difficulty, as you may understand, is that a statistical method is generally developed to compute things using all the data.