RxKotlin revolves around the observable type that represents a system of data/events intended for push mechanism (instead of the pull mechanism of the iterator pattern of traditional programs), thus it is lazy and can be used synchronously and asynchronously.
It will be easier for us to understand if we start with a simple example that works with a list of data. So, here is the code:
fun main(args: Array<String>) {
var list:List<Any> = listOf("One", 2, "Three", "Four", 4.5,
"Five", 6.0f) // 1
var iterator = list.iterator() // 2
while (iterator.hasNext()) { // 3
println(iterator.next()) // Prints each element 4
}
}
The following screenshot is the output:
So, let's go through the program line by line to understand how it works.
At comment 1, we're creating a list of seven items (the list contains data of mixed data types with the help of any class). On comment 2, we are creating iterator from the list, so that we can iterate over the data. In comment 3, we have created a while loop to pull data from the list with the help of iterator, and then, in 4, we're printing it.
The thing to notice is that we're pulling data from the list while the current thread is blocked until the data is received and ready. For example, think of getting that data from a network call/database query instead of just List and, in that case, how long the thread will be blocked. You can obviously create a separate thread for those operations, but then also, it will increase complexity.
Just give a thought; which one is a better approach? Making the program wait for data or pushing data to the program whenever it's available?
The building blocks of the ReactiveX Framework (be it RxKotlin or RxJava) are the observables. The observable class is just the opposite of iterator interface. It has an underlying collection or computation that produces values that can be consumed by a consumer. However, the difference is that the consumer doesn't pull these values from the producer, like in the iterator pattern; instead, the producer pushes the values as notifications to the consumer.
So, let's take the same example again, this time with observable:
fun main(args: Array<String>) {
var list:List<Any> = listOf("One", 2, "Three",
"Four", 4.5, "Five", 6.0f) // 1
var observable: Observable<Any> = list.toObservable();
observable.subscribeBy( // named arguments for
lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
}
This program output is the same as the previous one—it prints all the items in the list. The difference is in the approach. So, let's see how it actually works:
- Create a list (just the same as the previous one).
- An observable instance is created with that list.
- We're subscribing to the observer instance (we're using named arguments for lambda and covering it in detail later).
As we subscribe to observable, each data will be pushed to onNext, and, as it gets ready, it will call onComplete when all data is pushed and onError if any error occurs.
So, you learned to use the observable instances, and they are quite similar to the iterator instances, which is something we're very familiar with. We can use these observable instances to build asynchronous streams and push data updates to their subscribers (even to multiple subscribers).This was a simple implementation of the reactive programming paradigm. The data is being propagated to all the interested parties—the subscribers.