Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Save more on your purchases! discount-offer-chevron-icon
Savings automatically calculated. No voucher code required.
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds
Arrow up icon
GO TO TOP
Learning Concurrency in Python

You're reading from   Learning Concurrency in Python Build highly efficient, robust, and concurrent applications

Arrow left icon
Product type Paperback
Published in Aug 2017
Publisher Packt
ISBN-13 9781787285378
Length 360 pages
Edition 1st Edition
Languages
Concepts
Arrow right icon
Author (1):
Arrow left icon
Elliot Forbes Elliot Forbes
Author Profile Icon Elliot Forbes
Elliot Forbes
Arrow right icon
View More author details
Toc

Table of Contents (13) Chapters Close

Preface 1. Speed It Up! FREE CHAPTER 2. Parallelize It 3. Life of a Thread 4. Synchronization between Threads 5. Communication between Threads 6. Debug and Benchmark 7. Executors and Pools 8. Multiprocessing 9. Event-Driven Programming 10. Reactive Programming 11. Using the GPU 12. Choosing a Solution

Reactive programming

Reactive programming is very similar to that of event-driven, but instead of revolving around events, it focuses on data. More specifically, it deals with streams of data, and reacts to specific data changes.

ReactiveX - RxPy

RxPy is the Python equivalent of the very popular ReactiveX framework. If you've ever done any programming in Angular 2 and proceeding versions, then you will have used this when interacting with HTTP services. This framework is a conglomeration of the observer pattern, the iterator pattern, and functional programming. We essentially subscribe to different streams of incoming data, and then create observers that listen for specific events being triggered. When these observers are triggered, they run the code that corresponds to what has just happened.

We'll take a data center as a good example of how reactive programming can be utilized. Imagine this data center has thousands of server racks, all constantly computing millions upon millions of calculations. One of the biggest challenges in these data centers is keeping all these tightly packed server racks cool enough so that they don't damage themselves. We could set up multiple thermometers throughout our data center to ensure that we aren't getting too hot anywhere, and send the readings from these thermometers to a central computer as a continuous stream:

Within our central control station, we could set up a RxPy program that observes this continuous stream of temperature information. Within these observers, we could then define a series of conditional events to listen out for, and then react whenever one of these conditionals is hit.

One such example would be an event that only triggers if the temperature for a specific part of the data center gets too warm. When this event is triggered, we could then automatically react and increase the flow of any cooling system to that particular area, and thus bring the temperature back down again:

import rx
from rx import Observable, Observer
# Here we define our custom observer which
# contains an on_next method, an on_error method
# and an on_completed method
class temperatureObserver(Observer):
# Every time we receive a temperature reading
# this method is called
def on_next(self, x):
print("Temperature is: %s degrees centigrade" % x)
if (x > 6):
print("Warning: Temperate Is Exceeding Recommended Limit")
if (x == 9):
print("DataCenter is shutting down. Temperature is too high")
# if we were to receive an error message
# we would handle it here
def on_error(self, e):
print("Error: %s" % e)

# This is called when the stream is finished
def on_completed(self):
print("All Temps Read")
# Publish some fake temperature readings
xs = Observable.from_iterable(range(10))
# subscribe to these temperature readings
d = xs.subscribe(temperatureObserver())

Breaking it down

The first two lines of our code import the necessary rx module, and then from there import both observable and observer.

We then go on to create a temperatureObserver class that extends the observer. This class contains three functions:

  • on_next: This is called every time our observer observes something new
  • on_error: This acts as our error-handler function; every time we observe an error, this function will be called
  • on_completed: This is called when our observer meets the end of the stream of information it has been observing

In the on_next function, we want it to print out the current temperature, and also to check whether the temperature that it receives is under a set of limits. If the temperature matches one of our conditionals, then we handle it slightly differently, and print out descriptive errors as to what has happened.

After our class declaration, we go on to create a fake observable which contains 10 separate values using Observable.from_iterable(), and finally, the last line of our preceding code then subscribes an instance of our new temperatureObserver class to this observable.

You have been reading a chapter from
Learning Concurrency in Python
Published in: Aug 2017
Publisher: Packt
ISBN-13: 9781787285378
Register for a free Packt account to unlock a world of extra content!
A free Packt account unlocks extra newsletters, articles, discounted offers, and much more. Start advancing your knowledge today.
Unlock this book and the full library FREE for 7 days
Get unlimited access to 7000+ expert-authored eBooks and videos courses covering every tech area you can think of
Renews at $19.99/month. Cancel anytime
Banner background image