As covered earlier, Kotlin provides extension functions. These can be an enormously helpful alternative to using just compose() and lift().
For instance, we could not use Transformers and compose() to turn an Observable<T> into a Single<R>. But this is more than doable with Kotlin extension functions. In the following code, we create a toSet() operator and add it to Observable<T>:
import io.reactivex.Observable
fun main(args: Array<String>) {
val source = Observable.just("Alpha", "Beta", "Gama", "Delta",
"Epsilon")
val asSet = source.toSet()
}
fun <T> Observable<T>.toSet() =
collect({ HashSet<T>() }, { set, next -> set.add(next) })
.map { it as Set<T> }
The toSet()returns a Single<Set<T>>, and it was called on...