1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx2
import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
class FlowableTest : TestBase() {
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
val observable = rxFlowable {
expect(4)
send("OK")
}
expect(2)
observable.subscribe { value ->
expect(5)
Assert.assertThat(value, IsEqual("OK"))
}
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicFailure() = runBlocking {
expect(1)
val observable = rxFlowable<String>(NonCancellable) {
expect(4)
throw RuntimeException("OK")
}
expect(2)
observable.subscribe({
expectUnreached()
}, { error ->
expect(5)
Assert.assertThat(error, IsInstanceOf(RuntimeException::class.java))
Assert.assertThat(error.message, IsEqual("OK"))
})
expect(3)
yield() // to started coroutine
finish(6)
}
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
val observable = rxFlowable<String> {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
}
expect(2)
val sub = observable.subscribe({
expectUnreached()
}, {
expectUnreached()
})
expect(3)
yield() // to started coroutine
expect(5)
sub.dispose() // will cancel coroutine
yield()
finish(6)
}
@Test
fun testCancelsParentOnFailure() = runTest(
expected = { it is RuntimeException && it.message == "OK" }
) {
// has parent, so should cancel it on failure
rxFlowable<Unit> {
throw RuntimeException("OK")
}.subscribe(
{ expectUnreached() },
{ assert(it is RuntimeException) }
)
}
}
|