1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.rx2
import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.hamcrest.core.*
import org.junit.*
import org.junit.Assert.*
class CompletableTest : TestBase() {
@Test
fun testBasicSuccess() = runBlocking {
expect(1)
val completable = rxCompletable {
expect(4)
}
expect(2)
completable.subscribe {
expect(5)
}
expect(3)
yield() // to completable coroutine
finish(6)
}
@Test
fun testBasicFailure() = runBlocking {
expect(1)
val completable = rxCompletable(NonCancellable) {
expect(4)
throw RuntimeException("OK")
}
expect(2)
completable.subscribe({
expectUnreached()
}, { error ->
expect(5)
assertThat(error, IsInstanceOf(RuntimeException::class.java))
assertThat(error.message, IsEqual("OK"))
})
expect(3)
yield() // to completable coroutine
finish(6)
}
@Test
fun testBasicUnsubscribe() = runBlocking {
expect(1)
val completable = rxCompletable {
expect(4)
yield() // back to main, will get cancelled
expectUnreached()
}
expect(2)
// nothing is called on a disposed rx2 completable
val sub = completable.subscribe({
expectUnreached()
}, {
expectUnreached()
})
expect(3)
yield() // to started coroutine
expect(5)
sub.dispose() // will cancel coroutine
yield()
finish(6)
}
@Test
fun testAwaitSuccess() = runBlocking {
expect(1)
val completable = rxCompletable {
expect(3)
}
expect(2)
completable.await() // shall launch coroutine
finish(4)
}
@Test
fun testAwaitFailure() = runBlocking {
expect(1)
val completable = rxCompletable(NonCancellable) {
expect(3)
throw RuntimeException("OK")
}
expect(2)
try {
completable.await() // shall launch coroutine and throw exception
expectUnreached()
} catch (e: RuntimeException) {
finish(4)
assertThat(e.message, IsEqual("OK"))
}
}
@Test
fun testCancelsParentOnFailure() = runTest(
expected = { it is RuntimeException && it.message == "OK" }
) {
// has parent, so should cancel it on failure
rxCompletable {
throw RuntimeException("OK")
}.subscribe(
{ expectUnreached() },
{ assert(it is RuntimeException) }
)
}
}
|