1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
/*
This source file is part of the Swift.org open source project
Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
Licensed under Apache License v2.0 with Runtime Library Exception
See http://swift.org/LICENSE.txt for license information
See http://swift.org/CONTRIBUTORS.txt for Swift project authors
*/
@testable import TSCBasic
import XCTest
class SyncronizedQueueTests: XCTestCase {
func testSingleProducerConsumer() {
let queue = SynchronizedQueue<Int?>()
let queueElements = Set(0..<10)
var consumed = Set<Int>()
let producer = Thread {
for element in queueElements {
queue.enqueue(element)
}
queue.enqueue(nil)
}
let consumer = Thread {
while let element = queue.dequeue() {
consumed.insert(element)
}
}
consumer.start()
producer.start()
consumer.join()
producer.join()
XCTAssertEqual(consumed, queueElements)
}
func testMultipleProducerConsumer() {
let queue = SynchronizedQueue<Int?>()
let queueElementsOne = Set(0..<100)
let queueElementsTwo = Set(100..<500)
var consumed = Set<Int>()
let consumedLock = NSLock()
// Create two producers.
let producers = [queueElementsOne, queueElementsTwo].map { queueElements in
return Thread {
for element in queueElements {
queue.enqueue(element)
}
queue.enqueue(nil)
}
}
// Create two consumers.
let consumers = [0, 1].map { _ in
return Thread {
while let element = queue.dequeue() {
consumedLock.withLock {
_ = consumed.insert(element)
}
}
}
}
consumers.forEach { $0.start() }
producers.forEach { $0.start() }
// Block until all producers and consumers are done.
consumers.forEach { $0.join() }
producers.forEach { $0.join() }
// Make sure everything was consumed.
XCTAssertEqual(consumed, queueElementsOne.union(queueElementsTwo))
}
// Stress test for queue. Can produce an element only when current element gets consumed so
// the consumer will get repeatedly blocked waiting to be singaled before start again.
func testMultipleProducerConsumer2() {
let queue = SynchronizedQueue<Int?>()
let queueElementsOne = Set(0..<1000)
let queueElementsTwo = Set(1000..<5000)
var consumed = Set<Int>()
let canProduceCondition = Condition()
// Initially we should be able to produce.
var canProduce = true
// Create two producers.
let producers = [queueElementsOne, queueElementsTwo].map { queueElements in
return Thread {
for element in queueElements {
canProduceCondition.whileLocked {
// If we shouldn't produce, block.
while !canProduce {
canProduceCondition.wait()
}
// We're producing one element so don't produce next until its consumed.
canProduce = false
queue.enqueue(element)
}
}
queue.enqueue(nil)
}
}
// Create two consumers.
let consumers = [0, 1].map { _ in
return Thread {
while let element = queue.dequeue() {
canProduceCondition.whileLocked {
consumed.insert(element)
canProduce = true
canProduceCondition.signal()
}
}
}
}
consumers.forEach { $0.start() }
producers.forEach { $0.start() }
// Block until all producers and consumers are done.
consumers.forEach { $0.join() }
producers.forEach { $0.join() }
// Make sure everything was consumed.
XCTAssertEqual(consumed, queueElementsOne.union(queueElementsTwo))
}
}
|