1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.channels
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
import kotlin.test.*
class InvokeOnCloseStressTest : TestBase(), CoroutineScope {
private val iterations = 1000 * stressTestMultiplier
private val pool = newFixedThreadPoolContext(3, "InvokeOnCloseStressTest")
override val coroutineContext: CoroutineContext
get() = pool
@After
fun tearDown() {
pool.close()
}
@Test
fun testInvokedExactlyOnce() = runBlocking {
runStressTest(TestChannelKind.ARRAY_1)
}
@Test
fun testInvokedExactlyOnceBroadcast() = runBlocking {
runStressTest(TestChannelKind.CONFLATED_BROADCAST)
}
private suspend fun runStressTest(kind: TestChannelKind) {
repeat(iterations) {
val counter = AtomicInteger(0)
val channel = kind.create()
val latch = CountDownLatch(1)
val j1 = async {
latch.await()
channel.close()
}
val j2 = async {
latch.await()
channel.invokeOnClose { counter.incrementAndGet() }
}
val j3 = async {
latch.await()
channel.invokeOnClose { counter.incrementAndGet() }
}
latch.countDown()
joinAll(j1, j2, j3)
assertEquals(1, counter.get())
}
}
}
|