1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.reactive
import kotlinx.coroutines.*
import org.reactivestreams.*
import org.reactivestreams.tck.*
import org.testng.*
import org.testng.annotations.*
class ReactiveStreamTckTest {
@Factory(dataProvider = "dispatchers")
fun createTests(dispatcher: Dispatcher): Array<Any> {
return arrayOf(ReactiveStreamTckTestSuite(dispatcher))
}
@DataProvider(name = "dispatchers")
public fun dispatchers(): Array<Array<Any>> = Dispatcher.values().map { arrayOf<Any>(it) }.toTypedArray()
public class ReactiveStreamTckTestSuite(
private val dispatcher: Dispatcher
) : PublisherVerification<Long>(TestEnvironment(500, 500)) {
private val scope = CoroutineScope(dispatcher.dispatcher + NonCancellable)
override fun createPublisher(elements: Long): Publisher<Long> =
scope.publish {
for (i in 1..elements) send(i)
}
override fun createFailedPublisher(): Publisher<Long> =
scope.publish {
throw TestException()
}
@Test
public override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() {
// This test fails on default dispatcher because it retains a reference to the last task
// in the structure of its GlobalQueue
// So we skip it with the default dispatcher.
// todo: remove it when CoroutinesScheduler is improved
if (dispatcher == Dispatcher.DEFAULT) return
super.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
}
@Test
public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() {
throw SkipException("Skipped")
}
class TestException : Exception()
}
}
enum class Dispatcher(val dispatcher: CoroutineDispatcher) {
DEFAULT(Dispatchers.Default),
UNCONFINED(Dispatchers.Unconfined)
}
|