This two-part series explores asynchronous programming with Python using Asyncio. In Part 1 of this series, we started by building a project that shows how you can use Reactive Python in asynchronous programming. Let’s pick it back up here by exploring peer-to-peer communication and then just touching on service discovery before examining the streaming machine-to-machine concept.
So far we’ve established a websocket connection to process clock events asynchronously. Now that one pin swings between 1's and 0's, let's wire a buzzer and pretend it buzzes on high states (1) and remains silent on low ones (0). We can rephrase that in Python, like so:
# filename: sketches.py
import factory
class Buzzer(factory.FactoryLoop):
"""Buzz on light changes."""
def setup(self, sound):
# customize buzz sound
self.sound = sound
@factory.reactive
async def loop(self, channel, signal):
"""Buzzing."""
behavior = self.sound if signal == '1' else '...'
self.out('signal {} received -> {}'.format(signal, behavior))
return behavior
So how do we make them to communicate? Since they share a common parent class, we implement a stream method to send arbitrary data and acknowledge reception with, also, arbitrary data. To sum up, we want IOPin to use this API:
class IOPin(factory.FactoryLoop):
# [ ... ]
@protocol.reactive
async def loop(self, channel, msg):
# [ ... ]
await self.stream('buzzer', bits_stream)
return 'acknowledged'
The first challenge to solve is service discovery. We need to target specific nodes within a fleet of reactive workers.
This topic, however, goes past the scope of this post series. The shortcut below will do the job (that is, hardcode the nodes we will start), while keeping us focused on reactive messaging.
# -*- coding: utf-8 -*-
# vim_fenc=utf-8
#
# filename: mesh.py
"""Provide nodes network knowledge."""
import websockets
class Node(object):
def __init__(self, name, socket, port):
print('[ mesh ] registering new node: {}'.format(name))
self.name = name
self._socket = socket
self._port = port
def uri(self, path):
return 'ws://{socket}:{port}/{path}'.format(socket=self._socket,
port=self._port,
path=path)
def connection(self, path=''):
# instanciate the same connection as `clock` method
return websockets.connect(self.uri(path))
# TODO service discovery
def grid():
"""Discover and build nodes network."""
# of course a proper service discovery should be used here
# see consul or zookkeeper for example
# note: clock is not a server so it doesn't need a port
return [
Node('clock', 'localhost', None),
Node('blink', 'localhost', 8765),
Node('buzzer', 'localhost', 8765 + 1)
]
Let's provide FactoryLoop with the knowledge of the grid and implement an asynchronous communication channel.
# filename: factory.py (continued)
import mesh
class FactoryLoop(object):
def __init__(self, *args, **kwargs):
# now every instance will know about the other ones
self.grid = mesh.grid()
# ...
def node(self, name):
"""Search for the given node in the grid."""
return next(filter(lambda x: x.name == name, self.grid))
async def stream(self, target, data, channel):
self.out('starting to stream message to {}'.format(target))
# use the node webscoket connection defined in mesh.py
# the method is exactly the same as the clock
async with self.node(target).connection(channel) as ws:
for partial in data:
self.out('> sending payload: {}'.format(partial))
# websockets requires bytes or strings
await ws.send(str(partial))
self.out('< {}'.format(await ws.recv()))
We added a bit of debugging lines to better understand how the data flows through the network. Every implementation of the FactoryLoop can both react to events and communicate with other nodes it is aware of.
Time to update arduino.py and run our cluster of three reactive workers in three
@click.command()
# [ ... ]
def main(sketch, **flags):
# [ ... ]
elif sketch == 'buzzer':
sketchs.Buzzer(sound='buzz buzz buzz').run(flags['socket'], flags['port'])
Launch three terminals or use a tool such as foreman to spawn multiple processes. Either way, keep in mind that you will need to track the scripts output.
way, keep in mind that you will need to track the scripts output.
$ # start IOPin and Buzzer on the same ports we hardcoded in mesh.py
$ ./arduino.py buzzer --port 8766
$ ./arduino.py iopin --port 8765
$ # now that they listen, trigger actions with the clock (targetting IOPin port)
$ ./arduino.py clock --port 8765
[ ... ]
$ # Profit !
We just saw one worker reacting to a clock and another reacting to randomly generated events. The websocket protocol allowed us to exchange streaming data and receive arbitrary responses, unlocking sophisticated fleet orchestration. While we limited this example to two nodes, a powerful service discovery mechanism could bring to life a distributed network of microservices.
By completing this post series, you should now have a better understanding of how to use Python with Asyncio for asynchronous programming.
Xavier Bruhiere is a lead developer at AppTurbo in Paris, where he develops innovative prototypes to support company growth. He is addicted to learning, hacking on intriguing hot techs (both soft and hard), and practicing high-intensity sports.