ReactiveX is based on two entities: observables and observers. These are the only things that one needs to understand to be able to start writing code. Everything else is based on the behavior of one of these two entities.
Observables represent a source of events. An observable is an entity that can emit zero or one of several items. An observable has an explicit lifetime with a start and an end. When an observable completes or faces an error, it cannot send items anymore; its lifetime has ended. An observable may never end. In this case, it is an infinite source of events. Observables are a way to manage sequences of items in an asynchronous way. Table 1.1, which follows, shows a comparison between how to access items in a synchronous or asynchronous way. As you can see, observables fill a gap and are allowed to operate on multiple items in an asynchronous way.
|
Single item |
Multiple items |
Synchronous |
Getter |
Iterable |
Asynchronous |
Future |
Observable |
Table 1.1: Accessing an asynchronous sequence of items if possible
Observables work in push mode, as opposed to the pull mode of an iterable. Each time a new item is available, the observable pushes it to its observer. Table 1.2 shows the difference between the pull mode of an iterator and the push mode of an observable. This is what makes the behavior reactive and easy to handle with asynchronous code: whether items are emitted immediately or later is not important to the observer receiving it, and the code semantic is very similar to the one used in synchronous code:
Event |
Iterable (pull) |
Observable (push) |
Retrieve data |
For i in collection
|
on_next
|
Error |
Exception is raised |
on_error
|
Complete |
End of loop |
on_completed
|
Table 1.2 : Observables are push based
Observers are the receiving part of the items. An observer subscribes to an observable so that it can receive items emitted by this observable. Just as the observable emits items one after another, an observer receives them one after another. The observable informs the observer of the end of the sequence, either by indicating that the observable has completed (successfully) or by indicating that an error has occurred. These two kinds of completion are notified in a similar way, and so can be handled in a similar way. With ReactiveX, the error management is not a special case, but on a par with the items and completion management. In contrast to iterables that use exceptions, there is no radically different way of handling success from failure.
The implementation of RxPY (as well as all other implementations of ReactiveX) involves two other entities: a subscription function and a disposable object. Figure 1.5 shows a simplified representation of these entities. The AnonymousObservable class is the class that is almost always used to create an observable (directly or via another subclass). This class contains two methods to manage the lifetime of the observable and its observer. The first one, init, is not even a method but the constructor of the class. It takes a subscription function as an input argument. This subscription function will be called when an observer subscribes to this observable. The observable constructor returns a disposable function that can be called to free all resources used by the observable and observer. The second method of the AnonymousObservable class is subscribe. This is the method that is used to attach an observer to an observable and start the observable; that is, to make it start emitting items. The AnonymousObservable class can be used directly, but there are many cases where using an existing RxPY AnonymousObservable subclass is easier. This is typically the case when you need to create an observable from an iterable object or a single object.
The Observer class is a base class that contains three methods. They correspond to the behavior explained previously. This class must be subclassed to implement these three methods. The method on_next is called each time an item is emitted by the observable. The method on_completed is called when the observable completes successfully. Finally, the method on_error is called when the observable completes because of an error. The on_item method will never be called after the on_completed or the on_error methods.
The subscription entity is a function that takes an observer as input parameter. This function is called when the subscribe method of the observable is called. This is where the emission of items is implemented. The emission of these items can be either synchronous or asynchronous. Items are emitted in a synchronous way if the subscription function directly calls the on_items methods of the observable. But items can also be emitted asynchronously if the observer instance is saved and used later (after the subscription function returns). The subscription function can return a Disposable object or function. This Disposable object will be called when the observable is being disposed.
Finally, the Disposable class and its associated dispose function are used to clean up any resources used by an Observer or a subscription. In the case of an asynchronous observable, this is how the subscription function is notified that it must stop emitting items, because Observer is no more valid after that. The following figure shows these components:
Figure 1.5: RxPY components
Let's try to make more sense of all these definitions. The following figure shows a sequence diagram of how these calls are organized when an observable is created, subscribed, and finally disposed:
Figure 1.6: RxPY dynamics. An example of the creation of a synchronous observable and its subscription and disposal
First the application creates an AnonymousObservable and provides the subscription function associated to this observable. It then creates an observer object (actually a subclass of Observer). After that, the observable is subscribed, with reference to the observer object provided as an input parameter. During the call to subscribe, the subscription function is called. In this example, the subscription function is synchronous: it emits three items (the integers 1, 2, and 3), and completes the observable. When the subscription function returns, the observable is already completed. At that point, the subscribe method of AnonymousObservable returns the Disposable function to the application. The application finally calls this dispose function to clean up any resource still used by subscription and observer.