File: ChannelAtomicCancelStressTest.kt

package info (click to toggle)
kotlinx-coroutines 1.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 4,628 kB
  • sloc: xml: 418; sh: 322; javascript: 60; makefile: 17; java: 8
file content (148 lines) | stat: -rw-r--r-- 4,815 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import org.junit.*
import org.junit.Assert.*
import org.junit.runner.*
import org.junit.runners.*
import java.util.*
import java.util.concurrent.atomic.*

/**
 * Tests cancel atomicity for channel send & receive operations, including their select versions.
 */
@RunWith(Parameterized::class)
class ChannelAtomicCancelStressTest(private val kind: TestChannelKind) : TestBase() {
    companion object {
        @Parameterized.Parameters(name = "{0}")
        @JvmStatic
        fun params(): Collection<Array<Any>> = TestChannelKind.values().map { arrayOf<Any>(it) }
    }

    private val TEST_DURATION = 3000L * stressTestMultiplier

    val channel = kind.create()
    private val senderDone = Channel<Boolean>(1)
    private val receiverDone = Channel<Boolean>(1)

    private var lastSent = 0
    private var lastReceived = 0

    private var stoppedSender = 0
    private var stoppedReceiver = 0

    private var missedCnt = 0
    private var dupCnt = 0

    val failed = AtomicReference<Throwable>()

    lateinit var sender: Job
    lateinit var receiver: Job

    fun fail(e: Throwable) = failed.compareAndSet(null, e)

    private inline fun cancellable(done: Channel<Boolean>, block: () -> Unit) {
        try {
            block()
        } catch (e: Throwable) {
            if (e !is CancellationException) fail(e)
            throw e
        } finally {
            if (!done.offer(true))
                fail(IllegalStateException("failed to offer to done channel"))
        }
    }

    @Test
    fun testAtomicCancelStress() = runBlocking {
        println("--- ChannelAtomicCancelStressTest $kind")
        val deadline = System.currentTimeMillis() + TEST_DURATION
        launchSender()
        launchReceiver()
        val rnd = Random()
        while (System.currentTimeMillis() < deadline && failed.get() == null) {
            when (rnd.nextInt(3)) {
                0 -> { // cancel & restart sender
                    stopSender()
                    launchSender()
                }
                1 -> { // cancel & restart receiver
                    stopReceiver()
                    launchReceiver()
                }
                2 -> yield() // just yield (burn a little time)
            }
        }
        stopSender()
        stopReceiver()
        println("            Sent $lastSent ints to channel")
        println("        Received $lastReceived ints from channel")
        println("  Stopped sender $stoppedSender times")
        println("Stopped receiver $stoppedReceiver times")
        println("          Missed $missedCnt ints")
        println("      Duplicated $dupCnt ints")
        failed.get()?.let { throw it }
        assertEquals(0, dupCnt)
        if (!kind.isConflated) {
            assertEquals(0, missedCnt)
            assertEquals(lastSent, lastReceived)
        }
    }

    private fun launchSender() {
        sender = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
            val rnd = Random()
            cancellable(senderDone) {
                var counter = 0
                while (true) {
                    val trySend = lastSent + 1
                    when (rnd.nextInt(2)) {
                        0 -> channel.send(trySend)
                        1 -> select { channel.onSend(trySend) {} }
                        else -> error("cannot happen")
                    }
                    lastSent = trySend // update on success
                    if (counter++ % 1000 == 0) yield() // yield periodically to check cancellation on LinkedListChannel
                }
            }
        }
    }

    private suspend fun stopSender() {
        stoppedSender++
        sender.cancel()
        senderDone.receive()
    }

    private fun launchReceiver() {
        receiver = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
            val rnd = Random()
            cancellable(receiverDone) {
                while (true) {
                    val received = when (rnd.nextInt(2)) {
                        0 -> channel.receive()
                        1 -> select { channel.onReceive { it } }
                        else -> error("cannot happen")
                    }
                    val expected = lastReceived + 1
                    if (received > expected)
                        missedCnt++
                    if (received < expected)
                        dupCnt++
                    lastReceived = received
                }
            }
        }
    }

    private suspend fun stopReceiver() {
        stoppedReceiver++
        receiver.cancel()
        receiverDone.receive()
    }
}