File: ReactiveStreamTckTest.kt

package info (click to toggle)
kotlinx-coroutines 1.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 4,628 kB
  • sloc: xml: 418; sh: 322; javascript: 60; makefile: 17; java: 8
file content (63 lines) | stat: -rw-r--r-- 2,127 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import org.reactivestreams.*
import org.reactivestreams.tck.*
import org.testng.*
import org.testng.annotations.*


class ReactiveStreamTckTest {

    @Factory(dataProvider = "dispatchers")
    fun createTests(dispatcher: Dispatcher): Array<Any> {
        return arrayOf(ReactiveStreamTckTestSuite(dispatcher))
    }

    @DataProvider(name = "dispatchers")
    public fun dispatchers(): Array<Array<Any>> = Dispatcher.values().map { arrayOf<Any>(it) }.toTypedArray()


    public class ReactiveStreamTckTestSuite(
        private val dispatcher: Dispatcher
    ) : PublisherVerification<Long>(TestEnvironment(500, 500)) {

        private val scope = CoroutineScope(dispatcher.dispatcher + NonCancellable)

        override fun createPublisher(elements: Long): Publisher<Long> =
            scope.publish {
                for (i in 1..elements) send(i)
            }

        override fun createFailedPublisher(): Publisher<Long> =
            scope.publish {
                throw TestException()
            }

        @Test
        public override fun required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber() {
            // This test fails on default dispatcher because it retains a reference to the last task
            // in the structure of  its GlobalQueue
            // So we skip it with the default dispatcher.
            // todo: remove it when CoroutinesScheduler is improved
            if (dispatcher == Dispatcher.DEFAULT) return
            super.required_spec313_cancelMustMakeThePublisherEventuallyDropAllReferencesToTheSubscriber()
        }

        @Test
        public override fun optional_spec105_emptyStreamMustTerminateBySignallingOnComplete() {
            throw SkipException("Skipped")
        }

        class TestException : Exception()
    }
}

enum class Dispatcher(val dispatcher: CoroutineDispatcher) {
    DEFAULT(Dispatchers.Default),
    UNCONFINED(Dispatchers.Unconfined)
}