1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import org.junit.*
import org.junit.Assert.*
class ConvertTest : TestBase() {
@Test
fun testJobToMonoSuccess() = runBlocking {
expect(1)
val job = launch {
expect(3)
}
val mono = job.asMono(coroutineContext)
mono.subscribe {
expect(4)
}
expect(2)
yield()
finish(5)
}
@Test
fun testJobToMonoFail() = runBlocking {
expect(1)
val job = async(NonCancellable) { // don't kill parent on exception
expect(3)
throw RuntimeException("OK")
}
val mono = job.asMono(coroutineContext + NonCancellable)
mono.subscribe(
{ fail("no item should be emitted") },
{ expect(4) }
)
expect(2)
yield()
finish(5)
}
@Test
fun testDeferredToMono() {
val d = GlobalScope.async {
delay(50)
"OK"
}
val mono1 = d.asMono(Dispatchers.Unconfined)
checkMonoValue(mono1) {
assertEquals("OK", it)
}
val mono2 = d.asMono(Dispatchers.Unconfined)
checkMonoValue(mono2) {
assertEquals("OK", it)
}
}
@Test
fun testDeferredToMonoEmpty() {
val d = GlobalScope.async {
delay(50)
null
}
val mono1 = d.asMono(Dispatchers.Unconfined)
checkMonoValue(mono1, ::assertNull)
val mono2 = d.asMono(Dispatchers.Unconfined)
checkMonoValue(mono2, ::assertNull)
}
@Test
fun testDeferredToMonoFail() {
val d = GlobalScope.async {
delay(50)
throw TestRuntimeException("OK")
}
val mono1 = d.asMono(Dispatchers.Unconfined)
checkErroneous(mono1) {
check(it is TestRuntimeException && it.message == "OK") { "$it" }
}
val mono2 = d.asMono(Dispatchers.Unconfined)
checkErroneous(mono2) {
check(it is TestRuntimeException && it.message == "OK") { "$it" }
}
}
@Test
fun testToFlux() {
val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
send("K")
}
val flux = c.asFlux(Dispatchers.Unconfined)
checkMonoValue(flux.reduce { t1, t2 -> t1 + t2 }) {
assertEquals("OK", it)
}
}
@Test
fun testToFluxFail() {
val c = GlobalScope.produce {
delay(50)
send("O")
delay(50)
throw TestException("K")
}
val flux = c.asFlux(Dispatchers.Unconfined)
val mono = GlobalScope.mono(Dispatchers.Unconfined) {
var result = ""
try {
flux.consumeEach { result += it }
} catch(e: Throwable) {
check(e is TestException)
result += e.message
}
result
}
checkMonoValue(mono) {
assertEquals("OK", it)
}
}
}
|