1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import org.hamcrest.MatcherAssert.*
import org.hamcrest.core.*
import org.junit.*
import java.util.concurrent.atomic.*
class ConflatedBroadcastChannelNotifyStressTest : TestBase() {
private val nSenders = 2
private val nReceivers = 3
private val nEvents = 500_000 * stressTestMultiplier
private val timeLimit = 30_000L * stressTestMultiplier // 30 sec
private val broadcast = ConflatedBroadcastChannel<Int>()
private val sendersCompleted = AtomicInteger()
private val receiversCompleted = AtomicInteger()
private val sentTotal = AtomicInteger()
private val receivedTotal = AtomicInteger()
@Test
fun testStressNotify()= runBlocking {
println("--- ConflatedBroadcastChannelNotifyStressTest")
val senders = List(nSenders) { senderId ->
launch(Dispatchers.Default + CoroutineName("Sender$senderId")) {
repeat(nEvents) { i ->
if (i % nSenders == senderId) {
broadcast.offer(i)
sentTotal.incrementAndGet()
yield()
}
}
sendersCompleted.incrementAndGet()
}
}
val receivers = List(nReceivers) { receiverId ->
launch(Dispatchers.Default + CoroutineName("Receiver$receiverId")) {
var last = -1
while (isActive) {
val i = waitForEvent()
if (i > last) {
receivedTotal.incrementAndGet()
last = i
}
if (i >= nEvents) break
yield()
}
receiversCompleted.incrementAndGet()
}
}
// print progress
val progressJob = launch {
var seconds = 0
while (true) {
delay(1000)
println("${++seconds}: Sent ${sentTotal.get()}, received ${receivedTotal.get()}")
}
}
try {
withTimeout(timeLimit) {
senders.forEach { it.join() }
broadcast.offer(nEvents) // last event to signal receivers termination
receivers.forEach { it.join() }
}
} catch (e: CancellationException) {
println("!!! Test timed out $e")
}
progressJob.cancel()
println("Tested with nSenders=$nSenders, nReceivers=$nReceivers")
println("Completed successfully ${sendersCompleted.get()} sender coroutines")
println("Completed successfully ${receiversCompleted.get()} receiver coroutines")
println(" Sent ${sentTotal.get()} events")
println(" Received ${receivedTotal.get()} events")
assertThat(sendersCompleted.get(), IsEqual(nSenders))
assertThat(receiversCompleted.get(), IsEqual(nReceivers))
assertThat(sentTotal.get(), IsEqual(nEvents))
}
private suspend fun waitForEvent(): Int =
with(broadcast.openSubscription()) {
val value = receive()
cancel()
value
}
}
|