Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Free Learning
Arrow right icon
Arrow up icon
GO TO TOP
Hands-On Reactive Programming with Python

You're reading from   Hands-On Reactive Programming with Python Event-driven development unraveled with RxPY

Arrow left icon
Product type Paperback
Published in Oct 2018
Publisher Packt
ISBN-13 9781789138726
Length 420 pages
Edition 1st Edition
Languages
Tools
Arrow right icon
Author (1):
Arrow left icon
Romain Picard Romain Picard
Author Profile Icon Romain Picard
Romain Picard
Arrow right icon
View More author details
Toc

Table of Contents (16) Chapters Close

Preface 1. An Introduction to Reactive Programming FREE CHAPTER 2. Asynchronous Programming in Python 3. Functional Programming with ReactiveX 4. Exploring Observables and Observers 5. Concurrency and Parallelism in RxPY 6. Implementation of an Audio Transcoding Server 7. Using Third-Party Services 8. Dynamic Reconfiguration and Error Management 9. Operators in RxPY 10. Testing and Debugging 11. Deploying and Scaling Your Application 12. Reactive Streams for Remote Communication 13. A Checklist of Best Practices 14. Assessments 15. Other Books You May Enjoy

A reactive echo application

After this short introduction to ReactiveX and RxPY, the time has come to see some concrete code and write a first example. This first RxPY application is a command line interface (CLI) program that echoes the parameters that are provided as input. Save the following code in a file called echo1.py, or use the echo1.py script from the Git repository of this book, as shown in the following code:

import sys
from rx import Observable

argv = Observable.from_(sys.argv[1:])

argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))

Ensure that you are running in virutalenv, as shown in the following code:

$ source venv-rx/bin/activate

And when you run it, you should see the following output:

(venv-rx)$ python3 echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed

We ran the program with three parameters (hello, world, and !), and it printed these three parameters as well as information on the end of Observable. Let's detail each line of this program. We will start by importing the modules that we will use, as in the following code:

import sys
from rx import Observable

The sys module allows us to access the command line arguments. The rx module is the name of the RxPY package, which we installed from pip. We do not import the complete rx module, but just the Observable class. In many cases we will only need this class, or a few other ones. Then we can create an observable from the command line arguments, as in the following example:

argv = Observable.from_(sys.argv[1:])

sys.argv is a list containing the command line arguments that were used to run the program. The first argument is the name of the script being executed. In this case its value is echo1.py. Since we do not want to use this argument we omit it with a slice, using the second up to the last argument of the list. An observable is created from this list with the from_ creation operator. This operator creates an observable from a Python iterable object, which is the case of our argument list. We affect the reference of this observable to the argv variable. So, argv is a reference to an observable that will emit items containing the arguments provided on the command line, one item per argument. After this affectation the observable is created, but does not emit any item yet; items are emitted only once the observable is subscribed. On the last part of the program, we subscribe to this observable and print text depending on the event being received, as can be seen in the following code:

argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))

Three callback arguments are provided to the subscribe method: on_next, on_error, and on_completed. They are all optional, and they correspond to the reception of the associated events. As already explained, the on_next callback will be called zero or more times, and the on_error and on_completed callbacks can be called once at the most (and never if the observable never ends, which is not the case here). The call to the subscribe method is the one that makes the argv observable start emitting items. In this simple application, the code of each callback is very simple, so we use lambda instead of functions.

Lambdas are anonymous functions; that is, functions that can be referenced only from where they are defined, because they have no name. However, lambdas have restrictions over functions, which makes them only suitable when simple manipulations are done with the data:

  • Lambdas can use only expressions, not statements
  • Lambdas contain only one expression
  • Lambdas cannot declare or use local variables

So, lambdas are very useful when you need to do an action on one or several input parameters. For more complex logic, writing a function is mandatory. Lambdas are used a lot when developing RxPY code because many operators take functions as input. So, such operators are functions that accept functions as input parameters. Functions that accept functions as input are called higher order functions in functional programming and this is another aspect of functional programming used a lot in ReactiveX.

As you can see, the on_next callback is called once for each argument provided on the command line, and the on_completed callback is called right after. In this example application, we use a synchronous Observable. In practice, this means that all items are emitted in the context of the subscribe call. To confirm this, add another print statement after the subscribe call:

argv.subscribe(
on_next=lambda i: print("on_next: {}".format(i)),
on_error=lambda e: print("on_error: {}".format(e)),
on_completed=lambda: print("on_completed"))
print("done")

Then run the program again. You should see the following output:

(venv-rx)$ python3 ch1/echo1.py hello world !
on_next: hello
on_next: world
on_next: !
on_completed
done

As you can see, the done print is displayed after the observable completes because the observable emits all its items during the call to subscribe.

We will now add some functionality to this echo application. Instead of simply printing each argument, we will print them with the first letter in uppercase. This is the typical case of an action that must be applied to each item of an observable. In the current code, there are two possible locations to do it:

  • Either by using an operator on the argv observable
  • By modifying the on_next callback in the subscribe call

In a real application, there will usually be only a single place where the action must be applied, depending on whether the action must be done in the observer or on the observable directly. Implementing the action in the observer allows you to isolate the change to this single observer. The other way of implementing the action on the observable allows you to share this behavior with several observers. Here we will implement the action on the observable with the map operator. Modify the code as in the following example, or use the echo2.py script from the GitHub repository (https://github.com/PacktPublishing/Hands-On-Reactive-Programming-with-Python) of the book:

argv = Observable.from_(sys.argv[1:]) \
.map(lambda i: i.capitalize())

The map operator takes an observable as input, applies a transformation function on each item of this observable, and returns an observable which contains all input items, with the transformation applied to them. Here we have used lambda that returns the item (a string) capitalized; that is, with its first letter in uppercase. If you run this new code you should get the following output:

(venv-rx)$ python3 ch1/echo2.py hello world !
on_next: Hello
on_next: World
on_next: !
on_completed
done

As you can see, the output is the same, but with capitalized names. Congratulations; you have just written your first reactive application!

You have been reading a chapter from
Hands-On Reactive Programming with Python
Published in: Oct 2018
Publisher: Packt
ISBN-13: 9781789138726
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image