1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Atomics open source project
//
// Copyright (c) 2020 - 2023 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//
import XCTest
import Atomics
import Dispatch
class LockFreeSingleConsumerStack<Element> {
struct Node {
let value: Element
var next: UnsafeMutablePointer<Node>?
}
typealias NodePtr = UnsafeMutablePointer<Node>
private var _last = UnsafeAtomic<NodePtr?>.create(nil)
private var _consumerCount = UnsafeAtomic<Int>.create(0)
private var foo = 0
deinit {
// Discard remaining nodes
while let _ = pop() {}
_last.destroy()
_consumerCount.destroy()
}
// Push the given element to the top of the stack.
// It is okay to concurrently call this in an arbitrary number of threads.
func push(_ value: Element) {
let new = NodePtr.allocate(capacity: 1)
new.initialize(to: Node(value: value, next: nil))
var done = false
var current = _last.load(ordering: .relaxed)
while !done {
new.pointee.next = current
(done, current) = _last.compareExchange(
expected: current,
desired: new,
ordering: .releasing)
}
}
// Pop and return the topmost element from the stack.
// This method does not support multiple overlapping concurrent calls.
func pop() -> Element? {
precondition(
_consumerCount.loadThenWrappingIncrement(ordering: .acquiring) == 0,
"Multiple consumers detected")
defer { _consumerCount.wrappingDecrement(ordering: .releasing) }
var done = false
var current = _last.load(ordering: .acquiring)
while let c = current {
(done, current) = _last.compareExchange(
expected: c,
desired: c.pointee.next,
ordering: .acquiring)
if done {
let result = c.move()
c.deallocate()
return result.value
}
}
return nil
}
}
class LockFreeSingleConsumerStackTests: XCTestCase {
func test_Basics() {
let stack = LockFreeSingleConsumerStack<Int>()
XCTAssertNil(stack.pop())
stack.push(0)
XCTAssertEqual(0, stack.pop())
stack.push(1)
stack.push(2)
stack.push(3)
stack.push(4)
XCTAssertEqual(4, stack.pop())
XCTAssertEqual(3, stack.pop())
XCTAssertEqual(2, stack.pop())
XCTAssertEqual(1, stack.pop())
XCTAssertNil(stack.pop())
}
func test_ConcurrentPushes() {
let stack = LockFreeSingleConsumerStack<(thread: Int, value: Int)>()
let numThreads = 100
let numValues = 10_000
DispatchQueue.concurrentPerform(iterations: numThreads) { thread in
for value in 1 ... numValues {
stack.push((thread: thread, value: value))
}
}
var expected: [Int] = Array(repeating: numValues, count: numThreads)
while let (thread, value) = stack.pop() {
XCTAssertEqual(expected[thread], value)
expected[thread] -= 1
}
XCTAssertEqual(Array(repeating: 0, count: numThreads), expected)
}
func test_ConcurrentPushesAndPops() {
let stack = LockFreeSingleConsumerStack<(thread: Int, value: Int)>()
let numThreads = 100
let numValues = 10_000
var perThreadSums: [Int] = Array(repeating: 0, count: numThreads)
let consumerQueue = DispatchQueue(label: "org.swift.background")
consumerQueue.async {
var count = 0
while count < numThreads * numValues {
// Note: busy wait
if let (thread, value) = stack.pop() {
perThreadSums[thread] += value
count += 1
}
}
}
DispatchQueue.concurrentPerform(iterations: numThreads + 1) { thread in
if thread < numThreads {
// Producers
for value in 0 ..< numValues {
stack.push((thread: thread, value: value))
}
}
}
consumerQueue.sync {
XCTAssertEqual(Array(repeating: numValues * (numValues - 1) / 2, count: numThreads), perThreadSums)
}
}
#if MANUAL_TEST_DISCOVERY
public static var allTests = [
("test_Basics", test_Basics),
("test_ConcurrentPushes", test_ConcurrentPushes),
("test_ConcurrentPushesAndPops", test_ConcurrentPushesAndPops),
]
#endif
}
|