File: PublishParentCancelStressTest.kt

package info (click to toggle)
kotlinx-coroutines 1.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 4,628 kB
  • sloc: xml: 418; sh: 322; javascript: 60; makefile: 17; java: 8
file content (65 lines) | stat: -rw-r--r-- 2,351 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.reactive

import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import org.reactivestreams.*
import java.util.concurrent.*
import kotlin.test.*

public class PublishParentCancelStressTest : TestBase() {
    private val dispatcher = newFixedThreadPoolContext(3, "PublishParentCancelStressTest")
    private val N_TIMES = 5000 * stressTestMultiplier

    @After
    fun tearDown() {
        dispatcher.close()
    }

    @Test
    fun testStress() = runTest {
        var unhandled: Throwable? = null
        val handler = CoroutineExceptionHandler { _, ex -> unhandled = ex }
        repeat(N_TIMES) {
            val barrier = CyclicBarrier(4)
            // launch parent job for publisher
            val parent = GlobalScope.async<Unit>(dispatcher + handler) {
                val publisher = publish<Unit> {
                    // BARRIER #1 - child publisher crashes
                    barrier.await()
                    throw TestException()
                }
                var sub: Subscription? = null
                publisher.subscribe(object : Subscriber<Unit> {
                    override fun onComplete() { error("Cannot be reached") }
                    override fun onSubscribe(s: Subscription?) { sub = s }
                    override fun onNext(t: Unit?) { error("Cannot be reached" ) }
                    override fun onError(t: Throwable?) {
                        assertTrue(t is TestException)
                    }
                })
                launch {
                    // BARRIER #3 -- cancel subscription
                    barrier.await()
                    sub!!.cancel()
                }
                // BARRIER #2 -- parent completes
                barrier.await()
                Unit
            }
            // BARRIE #4 - go 1-3 together
            barrier.await()
            // Make sure exception is not lost, but incorporated into parent
            val result = kotlin.runCatching { parent.await() }
            assertTrue(result.exceptionOrNull() is TestException)
            // Make sure unhandled exception handler was not invoked
            assertNull(unhandled)
        }
    }

    private class TestException : Exception()
}