Reactive programming is a paradigm where the main focus is working with an asynchronous data flow. Reactive Extensions allow you to work with asynchronous data streams. Reactive Extensions is an agnostic framework (this means it has implementations for several languages), and can be used in other platforms (such as RxJava, RxSwift, and so on). This makes learning Reactive Extensions (and functional reactive programming) really useful, as you can use it to improve your code on different platforms.
One of these types, Reactive Extensions for JavaScript (RxJS) is a reactive streams library for JS that can be used both in the browser or in the server-side using Node.js. RxJS is a library for reactive programming using Observables. Observables provide support for passing messages between publishers and subscribers in your application. Observables offer significant benefits over other techniques for event handling, asynchronous programming, and handling multiple values.
In this article, we will learn about the different types of observables in the context of RxJS and a few different ways of creating them.
This article is an excerpt from the book, Mastering Reactive JavaScript, written by Erich de Souza Oliveira.
RxJS is divided into modules. This way, you can create your own bundle with only the modules you're interested in. However, we will always use the official bundle with all the contents from RxJS; by doing so, we'll not have to worry about whether a certain module exists in our bundle or not. So, let's follow the steps described here to install RxJS.
To install it on your server, just run the following command inside a node project:
npm i rx@4.1.0 -save
To add it to an HTML page, just paste the following code snippet inside your HTML:
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.1.0/rx.all.js"> </script>
If you're running inside a node program, you need to have the RxJS library in each JavaScript file that you want to use. To do this, add the following line to the beginning of your JavaScript file:
var Rx = require('rx');
The preceding line will be omitted in all examples, as we expect you to have added it before testing the sample code.
Here we will see a list of methods to create an observable from common event sources. This is not an exhaustive list, but it contains the most important ones.
We can create an observable from iterable objects using the from() method. An iterable in JavaScript can be an array (or an array-like object) or other iterates added in ES6 (such as Set() and map()). The from() method has the following signature:
Rx.Observable.from(iterable,[mapFunction],[context],[scheduler]);
Usually, you will pass only the first argument. Others arguments are optional; you can see them here:
Now let's see some examples on how we can create observables from iterables.
To create an observable from an array, you can use the following code:
Rx.Observable .from([0,1,2]) .subscribe((a)=>console.log(a));
This code prints the following output:
0 1 2
Now let's introduce a minor change in our code, to add the mapFunction argument to it, instead of creating an observable to propagate the elements of this array. Let's use mapFunction to propagate the double of each element of the following array:
Rx.Observable .from([0,1,2], (a) => a*2) .subscribe((a)=>console.log(a));
This prints the following output:
0 2 4
We can also use this method to create an observable from an arguments object. To do this, we need to run from() in a function. This way, we can access the arguments object of the function. We can implement it with the following code:
var observableFromArgumentsFactory = function(){ return Rx.Observable.from(arguments); }; observableFromArgumentsFactory(0,1,2) .subscribe((a)=>console.log(a));
If we run this code, we will see the following output:
0 1 2
One last usage of this method is to create an observable from either Set() or Map(). These data structures were added to ES6. We can implement it for a set as follows:
var set = new Set([0,1,2]); Rx.Observable .from(set) .subscribe((a)=>console.log(a));
This code prints the following output:
0 1 2
We can also use a map as an argument for the from() method, as follows:
var map = new Map([['key0',0],['key1',1],['key2',2]]); Rx.Observable .from(map) .subscribe((a)=>console.log(a));
This prints all the key-value tuples on this map:
[ 'key0', 0 ] [ 'key1', 1 ] [ 'key2', 2 ]
All observables created from this method are cold observables. As discussed before, this means it fires the same sequence for all the observers. To test this behavior, create an observable and add an Observer to it; add another observer to it after a second:
var observable = Rx.Observable.from([0,1,2]);
observable.subscribe((a)=>console.log('first subscriber receives => '+a));
setTimeout(()=>{
observable.subscribe((a)=>console.log('second subscriber receives => '+a));
},1000);
If you run this code, you will see the following output in your console, showing both the subscribers receiving the same data as expected:
first subscriber receives => 0 first subscriber receives => 1 first subscriber receives => 2 second subscriber receives => 0 second subscriber receives => 1 second subscriber receives => 2
Now that we have discussed how to create an observable from a sequence, let's see how we can create an observable from a sequence factory. RxJS has a built-in method called generate() that lets you create an observable from an iteration (such as a for() loop). This method has the following signature:
Rx.Observable.generate(initialState, conditionFunction, iterationFunction, resultFactory, [scheduler]);
In this method, the only optional parameter is the last one. A brief description of all the parameters is as follows:
Before checking out an example code for this method, let's see some code that implements one of the most basic constructs in a program: a for() loop. This is used to generate an array from an initial value to a final value. We can produce this array with the following code:
var resultArray=[]; for(var i=0;i < 3;i++){ resultArray.push(i) } console.log(resultArray);
This code prints the following output:
[0,1,2]
When you create a for() loop, you basically give to it the following: an initial state (the first argument), the condition to stop the iteration (the second argument), how to iterate over the value (the third argument), and what to do with the value (block). Its usage is very similar to the generate() method. Let's do the same thing, but using the generate() method and creating an observable instead of an array:
Rx.Observable.generate( 0, (i) => i<3, (i) => i+1, (i) => i ).subscribe((i) => console.log(i));
This code will print the following output:
0 1 2
Another common source of data for observables are ranges. With the range() method, we can easily create an observable for a sequence of values in a range. The range() method has the following signature:
Rx.Observable.range(first, count, [scheduler]);
The last parameter in the following list is the only optional parameter in this method:
We can create an observable using a range with the following code:
Rx.Observable .range(0, 4) .subscribe((i)=>console.log(i));
This prints the following output:
0 1 2 3
In the previous chapter, we discussed how to create timed sequences in bacon.js. In RxJS, we have two different methods to implement observables emitting values with a given interval. The first method is interval(). This method emits an infinite sequence of integers starting from one every x milliseconds; it has the following signature:
Rx.Observable.interval(interval, [scheduler]);
The interval parameter is mandatory, and the second argument is optional:
Run the following code:
Rx.Observable .interval(1000) .subscribe((i)=> console.log(i));
You will see an output as follows; you will have to stop your program (hitting Ctrl+C) or it will keep sending events:
0 1 2
The interval() method sends the first value of the sequence after the given period of interval and keeps sending values after each interval.
RxJS also has a method called timer(). This method lets you specify a due time to start the sequence or even generate an observable of only one value emitted after the due time has elapsed. It has the following signature:
Rx.Observable.timer(dueTime, [interval], [scheduler]);
Here are the parameters:
We can create an observable from the timer() method with the following code:
Rx.Observable .timer(1000,500) .subscribe((i)=> console.log(i));
You will see an output that will be similar to the following; you will have to stop your program or it will keep sending events:
0 1 2
We can also use this method to generate only one value and finish the sequence. We can do this omitting the interval parameter, as shown in the following code:
Rx.Observable .timer(1000) .subscribe((i)=> console.log(i));
If you run this code, it will only print 0 in your console and finish.
We learned about various RxJS Observables and a few different ways of creating them. Read the book, Mastering Reactive JavaScript, to create powerful applications using RxJs without compromising performance.