Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Reactive Python - Real-time events processing

Save for later
  • 8 min read
  • 04 Oct 2016

article-image

A recent trend in programming literature promotes functional programming as a sensible alternative to object-oriented programs for many use cases. This subject feeds many discussions and highlights how important program design is as our applications are becoming more and more complex. Although there might be here some seductive intellectual challenge (because yeah, we love to juggle with elegant abstractions), there are also real business values :

  • Building sustainable, maintainable programs
  • Decoupling architecture components for proper team work
  • Limiting bug exposure
  • Better product iteration

When developers spot an interesting approach to solve a recurrent issue in our industry, they formalize it as a design pattern. Today, we will discuss a powerful member of this family: the pattern observer. We won't dive into the strict rhetorical details (sorry, not sorry). Instead, we will delve how reactive programming can level up the quality of our work.

It's Python Week. That means you can not only save 50% on some of our latest Python products, but you can also pick up a free Python eBook every single day!

The scene

That was a bold statement; let's illustrate that with a real-world scenario. Say we were tasked to build a monitoring system. We need some way to collect data, analyze it, and take actions when things go unexpected. Anomaly detection is an exciting yet challenging problem. We don't want our data scientists to be bothered by infrastructure failures. And in the same spirit, we need other engineers to focus only on how to react to specific disaster scenarios.

The core of our approach consists of two components—a monitoring module firing and forgetting its discoveries on channels and another processing brick intercepting those events with an appropriate response. The UNIX philosophy at its best: do one thing and do it well. We split the infrastructure by concerns and the workers by event types. Assuming that our team defines well-documented interfaces, this is a promising design.

The rest of the article will discuss the technical implementation but keep in mind that I/O documentation and proper processing of load estimation are also fundamental.

The strategy

Our local lab is composed of three elements:

  • The alert module that we will emulate with a simple cli tool, which publishes alert messages.
  • The actual processing unit subscribing to events it knows how to react to.
  • A message broker supporting the Publish / Subscribe (or PUBSUB) pattern. For this purpose, Redis offers a popular, efficient, and rock solid solution. This is highly recommended, but the database isn't designed for this case. NATS, however, presents itself as follows:

NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always on ‘dial-tone’.

Sounds promising! Client libraries are available for major languages, and Apcera, the company sponsoring the technology, has a solid reputation for building reliable distributed systems.

Again, we won't delve how processing actually happens, only the orchestration of this three moving parts.

The setup

Since NATS is a message broker, we need to run a server locally (version 0.8.0 as of today). Gnatsd is the official and scalable first choice. It is written in Go, so we get performances and drop-in binary out of the box. For fans of microservices (as I am), an official Docker image is available for pulling. Also, for lazy ones (as I am), a demo server is already running at nats://demo.nats.io:4222.

Services will use Python 3.5.1, but 2.7.10 should do the job with minimal changes. Our scenario is mostly about data analysis and system administration on the backend, and Python has a wide range of tools for both areas. So let's install the requirements:

$ pip --version
pip 8.1.1

$ pip install 
-e git+https://github.com/mcuadros/pynats@6851e84eb4b244d22ffae65e9fbf79bd9872a5b3#egg=pynats 
click==6.6 # for cli integration

Thats'all. We are now ready to write services.

Publishing events

Let's warm up by sending some alerts to the cloud. First, we need to connect to the NATS server:

# -*- coding: utf-8 -*-
# vim_fenc=utf-8
#
# filename: broker.py

import pynats


def nats_conn(conf):
   """Connect to nats server from environment variables.

   The point is to allow easy switching without to change the code. You can
   read more on this approach stolen from 12 factors apps.
   """
   # the default value comes from docker-compose
   (https://docs.docker.com/compose/) services link behavior
   host = conf.get('__BROKER_HOST__', 'nats')
   port = conf.get('__BROKER_PORT__', 4222)
   opts = {
       'url': conf.get('url', 'nats://{host}:{port}'.format(host=host, port=port)),
       'verbose': conf.get('verbose', False)
   }

   print('connecting to broker ({opts})'.format(opts=opts))
   conn = pynats.Connection(**opts)
   conn.connect()
   return conn

This should be enough to start our client:

# -*- coding: utf-8 -*-
# vim_fenc=utf-8
#
# filename: observer.py

import os
import broker


def send(channel, msg):
   # use environment variables for configuration
   nats = broker.nats_conn(os.environ)
   nats.publish(channel, msg)
   nats.close()

And right after that, a few lines of code to shape a cli tool:

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at R$50/month. Cancel anytime
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim_fenc=utf-8
#
# filename: __main__.py

import click


@click.command()
@click.argument('command')
@click.option('--on', default='some_event', help='messages topic name')
def main(command, on):
   if command == 'send':
       click.echo('publishing message')
       observer.send(on, 'Terminator just dropped in our space-time')

if__name__ == '__main__':
   main()

chmod +x ./__main__.py gives it execution permission so we can test how our first bytes are doing.

$ # `click` package gives us a productive cli interface
$ ./__main__.py --help
Usage: __main__.py [OPTIONS] COMMAND

Options:
--on TEXT messages topic name
--help     Show this message and exit.

$ __BROKER_HOST__="demo.nats.io"./__main__.py send --on=click connecting to broker ({'verbose': False, 'url': 'nats://demo.nats.io:4222'})
publishing message ...

This is indeed quite poor in feedback, but no exception means that we did connect to the server and published a message.

Reacting to events

We're done with the heavy lifting! Now that interesting events are flying through the Internet, we can catch them and actually provide business values. Don't forget the point: let the team write reactive programs without worrying how it will be triggered. I found the following snippet to be a readable syntax for such a goal:

# filename: __main__.py

import observer


@observer.On('terminator_detected')
def alert_sarah_connor(msg):
   print(msg.data)

As the capitalized letter of On suggests, this is a Python class, wrapping a NATS connection. It aims to call the decorated function whenever a new message goes through the given channel.

Here is a naive implementation shamefully ignoring any reasonable error handling and safe connection termination (broker.nats_conn would be much more production-ready as a context manger, but hey, we do things that don't scale, move fast, and break things):

# filename: observer.py

class On(object):

   def__init__(self, event_name, **kwargs):
       self._count = kwargs.pop('count', None)
       self._event = event_name
       self._opts = kwargs or os.environ

   def__call__(self, fn):
       nats = broker.nats_conn(self._opts)
       subscription = nats.subscribe(self._event, fn)
       def inner():
           print('waiting for incoming messages')
           nats.wait(self._count)
           # we are done
           nats.unsubscribe(subscription)
           return nats.close()
       return inner

Instil some life into this file from the __main__.py:

# filename: __main__.py

@click.command()
@click.argument('command')
@click.option('--on', default='some_event', help='messages topic name')
def main(command, on):
   if command == 'send':
       click.echo('publishing message')
       observer.send(on, 'bad robot detected')
   elif command == 'listen':
       try:
           alert_sarah_connor():
       exceptKeyboardInterrupt:
           click.echo('caught CTRL-C, cleaning after ourselves...')

Your linter might complain about the injection of the msg argument in alert_sarah_connor, but no offense, it should just work (tm):

$ In a first terminal, listen to messages
$ __BROKER_HOST__="demo.nats.io"./__main__.py listen
connecting to broker ({'url': 'nats://demo.nats.io:4222', 'verbose':
False})
waiting for incoming messages

$ And fire up alerts in a second terminal
__BROKER_HOST__="demo.nats.io"--on='terminator_detected'

The data appears in the first terminal, celebrate!

Conclusion

Reactive programming implemented with the Publish/Subscribe pattern brings a lot of benefits for events-oriented products. Modular development, decoupled components, scalable distributed infrastructure, single-responsibility principle.One should think about how data flows into the system before diving into the technical details.

This kind of approach also gains traction from real-time data processing pipelines (Riemann, Spark, and Kafka). NATS performances, indeed, allow ultra low-latency architectures development without too much of a deployment overhead.

We covered in a few lines of Python the basics of a reactive programming design, with a lot of improvement opportunities: events filtering, built-in instrumentation, and infrastructure-wide error tracing. I hope you found in this article the building block to develop upon!

About the author

Xavier Bruhiere is the lead developer at AppTurbo in Paris, where he develops innovative prototypes to support company growth. He is addicted to learning, hacking on intriguing hot techs (both soft and hard), and practicing high intensity sports.

reactive-python-real-time-events-processing-img-0