1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
|
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Async Algorithms open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
//
//===----------------------------------------------------------------------===//
extension AsyncSequenceValidationDiagram {
public struct Specification: Sendable {
public let specification: String
public let location: SourceLocation
init(specification: String, location: SourceLocation) {
self.specification = specification
self.location = location
}
}
public struct Input: AsyncSequence, Sendable {
public typealias Element = String
struct State {
var emissions = [(Clock.Instant, Event)]()
}
let state = ManagedCriticalState(State())
let queue: WorkQueue
let index: Int
public struct Iterator: AsyncIteratorProtocol, Sendable {
let state: ManagedCriticalState<State>
let queue: WorkQueue
let index: Int
var active: (Clock.Instant, [Result<String?, Error>])?
var eventIndex = 0
mutating func apply(when: Clock.Instant, results: [Result<String?, Error>]) async throws -> Element? {
let token = queue.prepare()
if eventIndex + 1 >= results.count {
active = nil
}
defer {
if active != nil {
eventIndex += 1
} else {
eventIndex = 0
}
}
return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { continuation in
queue.enqueue(Context.currentJob, deadline: when, continuation: continuation, results[eventIndex], index: index, token: token)
}
} onCancel: { [queue] in
queue.cancel(token)
}
}
public mutating func next() async throws -> Element? {
if let (when, results) = active {
return try await apply(when: when, results: results)
} else {
let next = state.withCriticalRegion { state -> (Clock.Instant, Event)? in
guard state.emissions.count > 0 else {
return nil
}
return state.emissions.removeFirst()
}
guard let next = next else {
return nil
}
let when = next.0
let results = next.1.results
active = (when, results)
return try await apply(when: when, results: results)
}
}
}
public func makeAsyncIterator() -> Iterator {
Iterator(state: state, queue: queue, index: index)
}
func parse<Theme: AsyncSequenceValidationTheme>(_ dsl: String, theme: Theme, location: SourceLocation) throws {
let emissions = try Event.parse(dsl, theme: theme, location: location)
state.withCriticalRegion { state in
state.emissions = emissions
}
}
var end: Clock.Instant? {
return state.withCriticalRegion { state in
state.emissions.map { $0.0 }.sorted().last
}
}
}
public struct InputList: RandomAccessCollection, Sendable {
let state = ManagedCriticalState([Input]())
let queue: WorkQueue
public var startIndex: Int { return 0 }
public var endIndex: Int {
state.withCriticalRegion { $0.count }
}
public subscript(position: Int) -> AsyncSequenceValidationDiagram.Input {
get {
return state.withCriticalRegion { state in
if position >= state.count {
for _ in state.count...position {
state.append(Input(queue: queue, index: position))
}
}
return state[position]
}
}
}
}
}
|