After this short introduction to ReactiveX and RxPY, the time has come to see some concrete code and write a first example. This first RxPY application is a command line interface (CLI) program that echoes the parameters that are provided as input. Save the following code in a file called echo1.py, or use the echo1.py script from the Git repository of this book, as shown in the following code:
import sys
from rx import Observable
argv = Observable.from_(sys.argv[1:])
argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
Ensure that you are running in virutalenv, as shown in the following code:
$ source venv-rx/bin/activate
And when you run it, you should see the following output:
(venv-rx)$ python3 echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed
We ran the program with three parameters (hello, world, and !), and it printed these three parameters as well as information on the end of Observable. Let's detail each line of this program. We will start by importing the modules that we will use, as in the following code:
import sys
from rx import Observable
The sys module allows us to access the command line arguments. The rx module is the name of the RxPY package, which we installed from pip. We do not import the complete rx module, but just the Observable class. In many cases we will only need this class, or a few other ones. Then we can create an observable from the command line arguments, as in the following example:
argv = Observable.from_(sys.argv[1:])
sys.argv is a list containing the command line arguments that were used to run the program. The first argument is the name of the script being executed. In this case its value is echo1.py. Since we do not want to use this argument we omit it with a slice, using the second up to the last argument of the list. An observable is created from this list with the from_ creation operator. This operator creates an observable from a Python iterable object, which is the case of our argument list. We affect the reference of this observable to the argv variable. So, argv is a reference to an observable that will emit items containing the arguments provided on the command line, one item per argument. After this affectation the observable is created, but does not emit any item yet; items are emitted only once the observable is subscribed. On the last part of the program, we subscribe to this observable and print text depending on the event being received, as can be seen in the following code:
argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
Three callback arguments are provided to the subscribe method: on_next, on_error, and on_completed. They are all optional, and they correspond to the reception of the associated events. As already explained, the on_next callback will be called zero or more times, and the on_error and on_completed callbacks can be called once at the most (and never if the observable never ends, which is not the case here). The call to the subscribe method is the one that makes the argv observable start emitting items. In this simple application, the code of each callback is very simple, so we use lambda instead of functions.
Lambdas are anonymous functions; that is, functions that can be referenced only from where they are defined, because they have no name. However, lambdas have restrictions over functions, which makes them only suitable when simple manipulations are done with the data:
- Lambdas can use only expressions, not statements
- Lambdas contain only one expression
- Lambdas cannot declare or use local variables
So, lambdas are very useful when you need to do an action on one or several input parameters. For more complex logic, writing a function is mandatory. Lambdas are used a lot when developing RxPY code because many operators take functions as input. So, such operators are functions that accept functions as input parameters. Functions that accept functions as input are called higher order functions in functional programming and this is another aspect of functional programming used a lot in ReactiveX.
As you can see, the on_next callback is called once for each argument provided on the command line, and the on_completed callback is called right after. In this example application, we use a synchronous Observable. In practice, this means that all items are emitted in the context of the subscribe call. To confirm this, add another print statement after the subscribe call:
argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
print("done")
Then run the program again. You should see the following output:
(venv-rx)$ python3 ch1/echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed
done
As you can see, the done print is displayed after the observable completes because the observable emits all its items during the call to subscribe.
We will now add some functionality to this echo application. Instead of simply printing each argument, we will print them with the first letter in uppercase. This is the typical case of an action that must be applied to each item of an observable. In the current code, there are two possible locations to do it:
- Either by using an operator on the argv observable
- By modifying the on_next callback in the subscribe call
In a real application, there will usually be only a single place where the action must be applied, depending on whether the action must be done in the observer or on the observable directly. Implementing the action in the observer allows you to isolate the change to this single observer. The other way of implementing the action on the observable allows you to share this behavior with several observers. Here we will implement the action on the observable with the map operator. Modify the code as in the following example, or use the echo2.py script from the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of the book:
argv = Observable.from_(sys.argv[1:]) \
.map(lambda i: i.capitalize())
The map operator takes an observable as input, applies a transformation function on each item of this observable, and returns an observable which contains all input items, with the transformation applied to them. Here we have used lambda that returns the item (a string) capitalized; that is, with its first letter in uppercase. If you run this new code you should get the following output:
(venv-rx)$ python3 ch1/echo2.py hello world !
on_next: Hello
on_next: World
on_next: !
on_completed
done
As you can see, the output is the same, but with capitalized names. Congratulations; you have just written your first reactive application!