File: BroadcastChannelSubStressTest.kt

package info (click to toggle)
kotlinx-coroutines 1.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 4,628 kB
  • sloc: xml: 418; sh: 322; javascript: 60; makefile: 17; java: 8
file content (70 lines) | stat: -rw-r--r-- 2,422 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/*
 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
 */

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import org.junit.*
import org.junit.runner.*
import org.junit.runners.*
import java.util.concurrent.atomic.*

/**
 * Creates a broadcast channel and repeatedly opens new subscription, receives event, closes it,
 * to stress test the logic of opening the subscription
 * to broadcast channel while events are being concurrently sent to it.
 */
@RunWith(Parameterized::class)
class BroadcastChannelSubStressTest(
    private val kind: TestBroadcastChannelKind
) : TestBase() {
    companion object {
        @Parameterized.Parameters(name = "{0}")
        @JvmStatic
        fun params(): Collection<Array<Any>> =
            TestBroadcastChannelKind.values().map { arrayOf<Any>(it) }
    }

    private val nSeconds = 5 * stressTestMultiplier
    private val broadcast = kind.create<Long>()

    private val sentTotal = AtomicLong()
    private val receivedTotal = AtomicLong()

    @Test
    fun testStress() = runBlocking {
        println("--- BroadcastChannelSubStressTest $kind")
        val sender =
            launch(context = Dispatchers.Default + CoroutineName("Sender")) {
                while (isActive) {
                    broadcast.send(sentTotal.incrementAndGet())
                }
            }
        val receiver =
            launch(context = Dispatchers.Default + CoroutineName("Receiver")) {
                var last = -1L
                while (isActive) {
                    val channel = broadcast.openSubscription()
                    val i = channel.receive()
                    check(i >= last) { "Last was $last, got $i" }
                    if (!kind.isConflated) check(i != last) { "Last was $last, got it again" }
                    receivedTotal.incrementAndGet()
                    last = i
                    channel.cancel()
                }
            }
        var prevSent = -1L
        repeat(nSeconds) { sec ->
            delay(1000)
            val curSent = sentTotal.get()
            println("${sec + 1}: Sent $curSent, received ${receivedTotal.get()}")
            check(curSent > prevSent) { "Send stalled at $curSent events" }
            prevSent = curSent
        }
        withTimeout(5000) {
            sender.cancelAndJoin()
            receiver.cancelAndJoin()
        }
    }
}