Blocking subscribers
Try to remember the code blocks from previous chapters, where we used delay
to make the main thread wait whenever we used an Observable
or Flowable
that operates on a different thread. A perfect example of this scenario is when we used Observable.interval
as a factory method or when we used the subscribeOn
operator. To get you refreshed, following is such a code example:
fun main(args: Array<String>) { Observable.range(1,10) .subscribeOn(Schedulers.computation()) .subscribe { item -> println("Received $item") } runBlocking { delay(10) } }
In this example, we switched to Schedulers.computation
for the subscription. Now let's see, how we can test this Observable and check that we received exactly 10
emissions:
@Test fun `check emissions count` () { val emissionsCount = AtomicInteger()//(1) Observable.range(1,10) .subscribeOn(Schedulers.computation()) .blockingSubscribe...