1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
// RUN: %target-run-simple-swift(%import-libdispatch)
// REQUIRES: executable_test
// REQUIRES: libdispatch
// REQUIRES: synchronization
import Synchronization
import StdlibUnittest
import Dispatch
let suite = TestSuite("LockFreeSingleConsumerStackTests")
@available(SwiftStdlib 6.0, *)
class LockFreeSingleConsumerStack<Element> {
struct Node {
let value: Element
var next: UnsafeMutablePointer<Node>?
}
typealias NodePtr = UnsafeMutablePointer<Node>
private let _last = Atomic<NodePtr?>(nil)
private let _consumerCount = Atomic<Int>(0)
private var foo = 0
deinit {
// Discard remaining nodes
while let _ = pop() {}
}
// Push the given element to the top of the stack.
// It is okay to concurrently call this in an arbitrary number of threads.
func push(_ value: Element) {
let new = NodePtr.allocate(capacity: 1)
new.initialize(to: Node(value: value, next: nil))
var done = false
var current = _last.load(ordering: .relaxed)
while !done {
new.pointee.next = current
(done, current) = _last.compareExchange(
expected: current,
desired: new,
ordering: .releasing)
}
}
// Pop and return the topmost element from the stack.
// This method does not support multiple overlapping concurrent calls.
func pop() -> Element? {
precondition(
_consumerCount.wrappingAdd(1, ordering: .acquiring).oldValue == 0,
"Multiple consumers detected")
defer { _consumerCount.wrappingSubtract(1, ordering: .releasing) }
var done = false
var current = _last.load(ordering: .acquiring)
while let c = current {
(done, current) = _last.compareExchange(
expected: c,
desired: c.pointee.next,
ordering: .acquiring)
if done {
let result = c.move()
c.deallocate()
return result.value
}
}
return nil
}
}
if #available(SwiftStdlib 6.0, *) {
suite.test("basics") {
let stack = LockFreeSingleConsumerStack<Int>()
expectNil(stack.pop())
stack.push(0)
expectEqual(0, stack.pop())
stack.push(1)
stack.push(2)
stack.push(3)
stack.push(4)
expectEqual(4, stack.pop())
expectEqual(3, stack.pop())
expectEqual(2, stack.pop())
expectEqual(1, stack.pop())
expectNil(stack.pop())
}
suite.test("concurrentPushes") {
let stack = LockFreeSingleConsumerStack<(thread: Int, value: Int)>()
let numThreads = 100
let numValues = 10_000
DispatchQueue.concurrentPerform(iterations: numThreads) { thread in
for value in 1 ... numValues {
stack.push((thread: thread, value: value))
}
}
var expected: [Int] = Array(repeating: numValues, count: numThreads)
while let (thread, value) = stack.pop() {
expectEqual(expected[thread], value)
expected[thread] -= 1
}
expectEqual(Array(repeating: 0, count: numThreads), expected)
}
suite.test("concurrentPushesAndPops") {
let stack = LockFreeSingleConsumerStack<(thread: Int, value: Int)>()
let numThreads = 100
let numValues = 10_000
var perThreadSums: [Int] = Array(repeating: 0, count: numThreads)
let consumerQueue = DispatchQueue(label: "org.swift.background")
consumerQueue.async {
var count = 0
while count < numThreads * numValues {
// Note: busy wait
if let (thread, value) = stack.pop() {
perThreadSums[thread] += value
count += 1
}
}
}
DispatchQueue.concurrentPerform(iterations: numThreads + 1) { thread in
if thread < numThreads {
// Producers
for value in 0 ..< numValues {
stack.push((thread: thread, value: value))
}
}
}
consumerQueue.sync {
expectEqual(Array(repeating: numValues * (numValues - 1) / 2, count: numThreads), perThreadSums)
}
}
} // if #available(SwiftStdlib 6.0)
runAllTests()
|