Reactive programming is very similar to that of event-driven, but instead of revolving around events, it focuses on data. More specifically, it deals with streams of data, and reacts to specific data changes.
Reactive programming
ReactiveX - RxPy
RxPy is the Python equivalent of the very popular ReactiveX framework. If you've ever done any programming in Angular 2 and proceeding versions, then you will have used this when interacting with HTTP services. This framework is a conglomeration of the observer pattern, the iterator pattern, and functional programming. We essentially subscribe to different streams of incoming data, and then create observers that listen for specific events being triggered. When these observers are triggered, they run the code that corresponds to what has just happened.
We'll take a data center as a good example of how reactive programming can be utilized. Imagine this data center has thousands of server racks, all constantly computing millions upon millions of calculations. One of the biggest challenges in these data centers is keeping all these tightly packed server racks cool enough so that they don't damage themselves. We could set up multiple thermometers throughout our data center to ensure that we aren't getting too hot anywhere, and send the readings from these thermometers to a central computer as a continuous stream:
Within our central control station, we could set up a RxPy program that observes this continuous stream of temperature information. Within these observers, we could then define a series of conditional events to listen out for, and then react whenever one of these conditionals is hit.
One such example would be an event that only triggers if the temperature for a specific part of the data center gets too warm. When this event is triggered, we could then automatically react and increase the flow of any cooling system to that particular area, and thus bring the temperature back down again:
import rx
from rx import Observable, Observer
# Here we define our custom observer which
# contains an on_next method, an on_error method
# and an on_completed method
class temperatureObserver(Observer):
# Every time we receive a temperature reading
# this method is called
def on_next(self, x):
print("Temperature is: %s degrees centigrade" % x)
if (x > 6):
print("Warning: Temperate Is Exceeding Recommended Limit")
if (x == 9):
print("DataCenter is shutting down. Temperature is too high")
# if we were to receive an error message
# we would handle it here
def on_error(self, e):
print("Error: %s" % e)
# This is called when the stream is finished
def on_completed(self):
print("All Temps Read")
# Publish some fake temperature readings
xs = Observable.from_iterable(range(10))
# subscribe to these temperature readings
d = xs.subscribe(temperatureObserver())
Breaking it down
The first two lines of our code import the necessary rx module, and then from there import both observable and observer.
We then go on to create a temperatureObserver class that extends the observer. This class contains three functions:
- on_next: This is called every time our observer observes something new
- on_error: This acts as our error-handler function; every time we observe an error, this function will be called
- on_completed: This is called when our observer meets the end of the stream of information it has been observing
In the on_next function, we want it to print out the current temperature, and also to check whether the temperature that it receives is under a set of limits. If the temperature matches one of our conditionals, then we handle it slightly differently, and print out descriptive errors as to what has happened.
After our class declaration, we go on to create a fake observable which contains 10 separate values using Observable.from_iterable(), and finally, the last line of our preceding code then subscribes an instance of our new temperatureObserver class to this observable.