1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
final class UnboundedBufferStorage<Base: AsyncSequence>: Sendable where Base: Sendable {
private let stateMachine: ManagedCriticalState<UnboundedBufferStateMachine<Base>>
init(base: Base, policy: UnboundedBufferStateMachine<Base>.Policy) {
self.stateMachine = ManagedCriticalState(UnboundedBufferStateMachine<Base>(base: base, policy: policy))
}
func next() async -> Result<Base.Element, Error>? {
return await withTaskCancellationHandler {
let action: UnboundedBufferStateMachine<Base>.NextAction? = self.stateMachine.withCriticalRegion { stateMachine in
let action = stateMachine.next()
switch action {
case .startTask(let base):
self.startTask(stateMachine: &stateMachine, base: base)
return nil
case .suspend:
return action
case .returnResult:
return action
}
}
switch action {
case .startTask:
// We are handling the startTask in the lock already because we want to avoid
// other inputs interleaving while starting the task
fatalError("Internal inconsistency")
case .suspend:
break
case .returnResult(let result):
return result
case .none:
break
}
return await withUnsafeContinuation { (continuation: UnsafeContinuation<Result<Base.Element, Error>?, Never>) in
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(continuation: continuation)
}
switch action {
case .none:
break
case .resumeConsumer(let result):
continuation.resume(returning: result)
}
}
} onCancel: {
self.interrupted()
}
}
private func startTask(
stateMachine: inout UnboundedBufferStateMachine<Base>,
base: Base
) {
let task = Task {
do {
for try await element in base {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.elementProduced(element: element)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation, let result):
continuation.resume(returning: result)
}
}
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: nil)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: nil)
}
} catch {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.finish(error: error)
}
switch action {
case .none:
break
case .resumeConsumer(let continuation):
continuation?.resume(returning: .failure(error))
}
}
}
stateMachine.taskStarted(task: task)
}
func interrupted() {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.interrupted()
}
switch action {
case .none:
break
case .resumeConsumer(let task, let continuation):
task.cancel()
continuation?.resume(returning: nil)
}
}
deinit {
self.interrupted()
}
}
|