Search icon CANCEL
Subscription
0
Cart icon
Your Cart (0 item)
Close icon
You have no products in your basket yet
Arrow left icon
Explore Products
Best Sellers
New Releases
Books
Videos
Audiobooks
Learning Hub
Newsletter Hub
Free Learning
Arrow right icon
timer SALE ENDS IN
0 Days
:
00 Hours
:
00 Minutes
:
00 Seconds

Writing a Reddit Reader with RxPHP

Save for later
  • 9 min read
  • 09 Jan 2017

article-image

In this article by Martin Sikora, author of the book, PHP Reactive Programming, we will cover writing a CLI Reddit reader app using RxPHP, and we will see how Disposables are used in the default classes that come with RxPHP, and how these are going to be useful for unsubscribing from Observables in our app.

(For more resources related to this topic, see here.)

Examining RxPHP's internals

As we know, Disposables as a means for releasing resources used by Observers, Observables, Subjects, and so on. In practice, a Disposable is returned, for example, when subscribing to an Observable. Consider the following code from the default RxObservable::subscribe() method:

function subscribe(ObserverI $observer, $scheduler = null) {

   $this->observers[] = $observer;

   $this->started = true;

   return new CallbackDisposable(function () use ($observer) {

       $this->removeObserver($observer);

   });

}

This method first adds the Observer to the array of all subscribed Observers. It then marks this Observable as started and, at the end, it returns a new instance of the CallbackDisposable class, which takes a Closure as an argument and invokes it when it's disposed. This is probably the most common use case for Disposables.

This Disposable just removes the Observer from the array of subscribers and therefore, it receives no more events emitted from this Observable.

A closer look at subscribing to Observables

It should be obvious that Observables need to work in such way that all subscribed Observables iterate. Then, also unsubscribing via a Disposable will need to remove one particular Observer from the array of all subscribed Observables.

However, if we have a look at how most of the default Observables work, we find out that they always override the Observable::subscribe() method and usually completely omit the part where it should hold an array of subscribers. Instead, they just emit all available values to the subscribed Observer and finish with the onComplete() signal immediately after that. For example, we can have a look at the actual source code of the subscribe() method of the RxReturnObservable class:

function subscribe(ObserverI $obs, SchedulerI $sched = null) {

   $value = $this->value;

   $scheduler = $scheduler ?: new ImmediateScheduler();

   $disp = new CompositeDisposable();

   $disp->add($scheduler->schedule(function() use ($obs, $val) {

       $obs->onNext($val);

   }));

   $disp->add($scheduler->schedule(function() use ($obs) {

       $obs->onCompleted();

   }));

   return $disp;

}

The ReturnObservable class takes a single value in its constructor and emits this value to every Observer as they subscribe.

The following is a nice example of how the lifecycle of an Observable might look:

  • When an Observer subscribes, it checks whether a Scheduler was also passed as an argument. Usually, it's not, so it creates an instance of ImmediateScheduler.
  • Then, an instance of CompositeDisposable is created, which is going to keep an array of all Disposables used by this method. When calling CompositeDisposable::dispose(), it iterates all disposables it contains and calls their respective dispose() methods.
  • Right after that we start populating our CompositeDisposable with the following:
    $disposable->add($scheduler->schedule(function() { ... }));
  • This is something we'll see very often. SchedulerInterface::schedule() returns a DisposableInterface, which is responsible for unsubscribing and releasing resources. In this case, when we're using ImmediateScheduler, which has no other logic, it just evaluates the Closure immediately:
    function () use ($obs, $val) {
    
       $observer->onNext($val);
    
    }
  • Since ImmediateScheduler::schedule() doesn't need to release any resources (it didn't use any), it just returns an instance of RxDisposableEmptyDisposable that does literally nothing.
  • Then the Disposable is returned, and could be used to unsubscribe from this Observable. However, as we saw in the preceding source code, this Observable doesn't let you unsubscribe, and if we think about it, it doesn't even make sense because ReturnObservable class's value is emitted immediately on subscription.

The same applies to other similar Observables, such as IteratorObservable, RangeObservable or ArrayObservable. These just contain recursive calls with Schedulers but the principle is the same.

A good question is, why on Earth is this so complicated? All the preceding code does could be stripped into the following three lines (assuming we're not interested in using Schedulers):

function subscribe(ObserverI $observer) {

   $observer->onNext($this->value);

   $observer->onCompleted();

}

Well, for ReturnObservable this might be true, but in real applications, we very rarely use any of these primitive Observables. It's true that we usually don't even need to deal with Schedulers. However, the ability to unsubscribe from Observables or clean up any resources when unsubscribing is very important and we'll use it in a few moments.

A closer look at Operator chains

Before we start writing our Reddit reader, we should talk briefly about an interesting situation that might occur, so it doesn't catch us unprepared later.

We're also going to introduce a new type of Observable, called ConnectableObservable. Consider this simple Operator chain with two subscribers:

// rxphp_filters_observables.php

use RxObservableRangeObservable;

use RxObservableConnectableObservable;

$connObs = new ConnectableObservable(new RangeObservable(0, 6));

$filteredObs = $connObs

   ->map(function($val) {

       return $val ** 2;

   })

   ->filter(function($val) {

       return $val % 2;,

   });

$disposable1 = $filteredObs->subscribeCallback(function($val) {

   echo "S1: ${val}n";

});

$disposable2 = $filteredObs->subscribeCallback(function($val) {

   echo "S2: ${val}n";

});

$connObservable->connect();

The ConnectableObservable class is a special type of Observable that behaves similarly to Subject (in fact, internally, it really uses an instance of the Subject class). Any other Observable emits all available values right after you subscribe to it. However, ConnectableObservable takes another Observable as an argument and lets you subscribe Observers to it without emitting anything. When you call ConnectableObservable::connect(), it connects Observers with the source Observables, and all values go one by one to all subscribers.

Internally, it contains an instance of the Subject class and when we called subscribe(), it just subscribed this Observable to its internal Subject. Then when we called the connect() method, it subscribed the internal Subject to the source Observable.

In the $filteredObs variable we keep a reference to the last Observable returned from filter() call, which is an instance of AnnonymousObservable where, on next few lines, we subscribe both Observers.

Unlock access to the largest independent learning library in Tech for FREE!
Get unlimited access to 7500+ expert-authored eBooks and video courses covering every tech area you can think of.
Renews at €18.99/month. Cancel anytime

Now, let's see what this Operator chain prints:

$ php rxphp_filters_observables.php

S1: 1

S2: 1

S1: 9

S2: 9

S1: 25

S2: 25

As we can see, each value went through both Observers in the order they were emitted. Just out of curiosity, we can also have a look at what would happen if we didn't use ConnectableObservable, and used just the RangeObservable instead:

$ php rxphp_filters_observables.php

S1: 1

S1: 9

S1: 25

S2: 1

S2: 9

S2: 25

This time, RangeObservable emitted all values to the first Observer and then, again, all values to the second Observer.

Right now, we can tell that the Observable had to generate all the values twice, which is inefficient, and with a large dataset, this might cause a performance bottleneck.

Let's go back to the first example with ConnectableObservable, and modify the filter() call so it prints all the values that go through:

$filteredObservable = $connObservable

   ->map(function($val) {

       return $val ** 2;

   })

   ->filter(function($val) {

       echo "Filter: $valn";

       return $val % 2;

   });

Now we run the code again and see what happens:

$ php rxphp_filters_observables.php

Filter: 0

Filter: 0

Filter: 1

S1: 1

Filter: 1

S2: 1

Filter: 4

Filter: 4

Filter: 9

S1: 9

Filter: 9

S2: 9

Filter: 16

Filter: 16

Filter: 25

S1: 25

Filter: 25

S2: 25

Well, this is unexpected! Each value is printed twice. This doesn't mean that the Observable had to generate all the values twice, however. It's not obvious at first sight what happened, but the problem is that we subscribed to the Observable at the end of the Operator chain.

As stated previously, $filteredObservable is an instance of AnnonymousObservable that holds many nested Closures. By calling its subscribe() method, it runs a Closure that's created by its predecessor, and so on. This leads to the fact that every call to subscribe() has to invoke the entire chain. While this might not be an issue in many use cases, there are situations where we might want to do some special operation inside one of the filters. Also, note that calls to the subscribe() method might be out of our control, performed by another developer who wanted to use an Observable we created for them.

It's good to know that such a situation might occur and could lead to unwanted behavior.

It's sometimes hard to see what's going on inside Observables. It's very easy to get lost, especially when we have to deal with multiple Closures. Schedulers are prime examples. Feel free to experiment with the examples shown here and use debugger to examine step-by-step what code gets executed and in what order.

So, let's figure out how to fix this. We don't want to subscribe at the end of the chain multiple times, so we can create an instance of Subject class, where we'll subscribe both Observers, and the Subject class itself will subscribe to the AnnonymousObservable as discussed a moment ago:

// ...

use RxSubjectSubject;

$subject = new Subject();

$connObs = new ConnectableObservable(new RangeObservable(0, 6));

$filteredObservable = $connObs

   ->map(function($val) {

       return $val ** 2;

   })

   ->filter(function($val) {

       echo "Filter: $valn";

       return $val % 2;

   })

   ->subscribe($subject);

$disposable1 = $subject->subscribeCallback(function($val) {

   echo "S1: ${val}n";

});

$disposable2 = $subject->subscribeCallback(function($val) {

   echo "S2: ${val}n";

});

$connObservable->connect();

Now we can run the script again and see that it does what we wanted it to do:

$ php rxphp_filters_observables.php

Filter: 0

Filter: 1

S1: 1

S2: 1

Filter: 4

Filter: 9

S1: 9

S2: 9

Filter: 16

Filter: 25

S1: 25

S2: 25

This might look like an edge case, but soon we'll see that this issue, left unhandled, could lead to some very unpredictable behavior. We'll bring out both these issues (proper usage of Disposables and Operator chains) when we start writing our Reddit reader.

Summary

In this article, we looked in more depth at how to use Disposables and Operators, how these work internally, and what it means for us. We also looked at a couple of new classes from RxPHP, such as ConnectableObservable, and CompositeDisposable.

Resources for Article:


Further resources on this subject: