Reactive Extensions
Now that we've seen that the principles in the reactive programming paradigm aren't completely new for us, we can start thinking about how to put all this together. In other words, what libraries or frameworks do we really need in order to start writing reactive code.
Reactive Extensions (ReactiveX or just Rx in short) are a set of libraries in various languages that make reactive programming easy even in languages where concepts of asynchronous and functional programming are clumsy, such as PHP. However, there's a very important distinction:
Reactive programming doesn't equal Reactive Extensions.
A Reactive Extension is a library that introduces certain principles as one of the possible ways to approach reactive programming. Very often, when somebody tells you they're using reactive programming to do something in their applications, they're in fact talking about a particular Reactive Extension library in their favorite language.
Reactive Extensions were originally made by Microsoft for .NET and called Rx.NET. Later, it was ported by Netflix to Java as RxJava. Now, there are over a dozen supported languages, the most popular probably being RxJS - the JavaScript implementation.
All ports follow a very similar API design, however, differences occur and we'll talk about them a couple of times. We'll be mostly interested in differences between RxPHP and RxJS.
RxPHP is mostly uncharted territory. A more typical environment where we encounter asynchronous events is JavaScript, so we'll first demonstrate examples in JavaScript (and RxJS 5), and afterwards we will have a look at RxPHP.
Autocomplete with RxJS
Imagine we want to implement an autocomplete feature that downloads suggestions from Wikipedia (this example comes from the official collection of demos on RxJS's GitHub page):
function searchAndReturnPromise(term) { // perform an AJAX request and return a Promise } var keyup = Rx.Observable.fromEvent($('#textInput'), 'keyup') .map(e => e.target.value) .filter(text => text.length > 2) .debounceTime(750) .distinctUntilChanged(); var searcher = keyup.switchMap(searchAndReturnPromise);
Let's take a closer look at how this works:
- We create an Observable from the form input's
keyup
event. This function is built into RxJS to simplify creating Observables. We can, of course, create our own Observables as well. - Apply the
map()
function. This is exactly what we have already seen above. Note that thismap()
function, is in fact, notArray.map()
, butObservable.map()
instead, because we're not working with arrays here. - Chain with
filter()
method. Exactly the same case as withmap()
. - Method
debounceTime()
is used to limit propagating an event down the stream only once after a period of time. In this case, we're using 750ms, which means that, when the user starts typing, it won't download data from Wikipedia on everykeyup
event, but only after at least a 750ms delay between two events. - The
distinctUntilChanged()
method makes sure we're calling the AJAX request only when the value has really changed from the last time, because it makes no sense to download the same suggestions twice. - The last statement with
keyup.switchMap()
guarantees that when making multiple asynchronous calls, only the last one in the stream gets processed. All the others are dismissed. This is important because, when dealing with AJAX calls, we have absolutely no control over which Promise resolves first.
If we didn't use RxJS, this feature would require multiple state variables. At least to keep the last value from the input, the last time the event occurred, and the last request value for the AJAX call. With RxJS, we can focus on what we want to do and not worry about its implementation details (declarative approach).
With Reactive Extensions, this approach fulfills all we described above about reactive programming, functional programming and also, mostly, declarative programming.
Mouse position on drag and drop
Let's have a look at a slightly more complicated example in RxJS. We want to track the relative mouse position from where we start dragging an HTML element, until we release it (mouseup
event).
Pay attention to how this example combines multiple Observables (this example also comes from the official collection of demos on RxJS's GitHub page):
var mouseup = Rx.Observable.fromEvent(dragTarget, 'mouseup'); var mousemove = Rx.Observable.fromEvent(document, 'mousemove'); var mousedown = Rx.Observable.fromEvent(dragTarget, 'mousedown'); var mousedrag = mousedown.mergeMap(md => { var sX = md.offsetX, sY = md.offsetY; return mousemove.map(mm => { mm.preventDefault(); return {left: mm.clientX - sX, top: mm.clientY - sY}; }).takeUntil(mouseup); }); var subscription = mousedrag.subscribe(pos => { dragTarget.style.top = pos.top + 'px'; dragTarget.style.left = pos.left + 'px'; });
Notice that mousedrag
is an Observable created by calling return mousemove(...)
and that it emits events only until a mouseup
event is emitted thanks to takeUntil(mouseup)
.
Normally, without RxJS and with a typical imperative approach, this would be even more complicated than the previous example, with more state variables.
Of course, this requires some basic knowledge of what functions are available for Observables, but even without any previous experience, the code should be reasonably easy to understand. Yet again, the implementation details are completely hidden for us.