1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
// RUN: %target-swift-frontend -disable-availability-checking %s -emit-sil -o /dev/null -verify
// RUN: %target-swift-frontend -disable-availability-checking %s -emit-sil -o /dev/null -verify -strict-concurrency=targeted
// RUN: %target-swift-frontend -disable-availability-checking %s -emit-sil -o /dev/null -verify -strict-concurrency=complete -verify-additional-prefix tns-
// REQUIRES: concurrency
// REQUIRES: asserts
// REQUIRES: libdispatch
@available(SwiftStdlib 5.1, *)
func asyncFunc() async -> Int { 42 }
@available(SwiftStdlib 5.1, *)
func asyncThrowsFunc() async throws -> Int { 42 }
@available(SwiftStdlib 5.1, *)
func asyncThrowsOnCancel() async throws -> Int {
// terrible suspend-spin-loop -- do not do this
// only for purposes of demonstration
while Task.isCancelled {
try? await Task.sleep(nanoseconds: 1_000_000_000)
}
throw CancellationError()
}
@available(SwiftStdlib 5.1, *)
func test_taskGroup_add() async throws -> Int {
try await withThrowingTaskGroup(of: Int.self) { group in
group.addTask {
await asyncFunc()
}
group.addTask {
await asyncFunc()
}
var sum = 0
while let v = try await group.next() {
sum += v
}
return sum
} // implicitly awaits
}
// ==== ------------------------------------------------------------------------
// MARK: Example group Usages
struct Boom: Error {}
@available(SwiftStdlib 5.1, *)
func work() async -> Int { 42 }
@available(SwiftStdlib 5.1, *)
func boom() async throws -> Int { throw Boom() }
@available(SwiftStdlib 5.1, *)
func first_allMustSucceed() async throws {
let first: Int = try await withThrowingTaskGroup(of: Int.self) { group in
group.addTask { await work() }
group.addTask { await work() }
group.addTask { try await boom() }
if let first = try await group.next() {
return first
} else {
fatalError("Should never happen, we either throw, or get a result from any of the tasks")
}
// implicitly await: boom
}
_ = first
// Expected: re-thrown Boom
}
@available(SwiftStdlib 5.1, *)
func first_ignoreFailures() async throws {
@Sendable func work() async -> Int { 42 }
@Sendable func boom() async throws -> Int { throw Boom() }
let first: Int = try await withThrowingTaskGroup(of: Int.self) { group in
group.addTask { await work() }
group.addTask { await work() }
group.addTask {
do {
return try await boom()
} catch {
return 0 // TODO: until try? await works properly
}
}
var result: Int = 0
while let v = try await group.next() {
result = v
if result != 0 {
break
}
}
return result
}
_ = first
// Expected: re-thrown Boom
}
// ==== ------------------------------------------------------------------------
// MARK: Advanced Custom Task Group Usage
@available(SwiftStdlib 5.1, *)
func test_taskGroup_quorum_thenCancel() async {
// imitates a typical "gather quorum" routine that is typical in distributed systems programming
enum Vote {
case yay
case nay
}
struct Follower: Sendable {
init(_ name: String) {}
func vote() async throws -> Vote {
// "randomly" vote yes or no
return .yay
}
}
/// Performs a simple quorum vote among the followers.
///
/// - Returns: `true` iff `N/2 + 1` followers return `.yay`, `false` otherwise.
func gatherQuorum(followers: [Follower]) async -> Bool {
try! await withThrowingTaskGroup(of: Vote.self) { group in
for follower in followers {
group.addTask { try await follower.vote() }
}
defer {
group.cancelAll()
}
var yays: Int = 0
var nays: Int = 0
let quorum = Int(followers.count / 2) + 1
while let vote = try await group.next() {
switch vote {
case .yay:
yays += 1
if yays >= quorum {
// cancel all remaining voters, we already reached quorum
return true
}
case .nay:
nays += 1
if nays >= quorum {
return false
}
}
}
return false
}
}
_ = await gatherQuorum(followers: [Follower("A"), Follower("B"), Follower("C")])
}
// FIXME: this is a workaround since (A, B) today isn't inferred to be Sendable
// and causes an error, but should be a warning (this year at least)
@available(SwiftStdlib 5.1, *)
struct SendableTuple2<A: Sendable, B: Sendable>: Sendable {
let first: A
let second: B
init(_ first: A, _ second: B) {
self.first = first
self.second = second
}
}
@available(SwiftStdlib 5.1, *)
extension Collection where Self: Sendable, Element: Sendable, Self.Index: Sendable {
/// Just another example of how one might use task groups.
func map<T: Sendable>(
parallelism requestedParallelism: Int? = nil/*system default*/,
// ordered: Bool = true, /
_ transform: @Sendable (Element) async throws -> T // expected-note {{parameter 'transform' is implicitly non-escaping}}
) async throws -> [T] { // TODO: can't use rethrows here, maybe that's just life though; rdar://71479187 (rethrows is a bit limiting with async functions that use task groups)
let defaultParallelism = 2
let parallelism = requestedParallelism ?? defaultParallelism
let n = self.count
if n == 0 {
return []
}
return try await withThrowingTaskGroup(of: SendableTuple2<Int, T>.self) { group in
var result = ContiguousArray<T>()
result.reserveCapacity(n)
var i = self.startIndex
var submitted = 0
func submitNext() async throws {
// The reason that we emit an error here is b/c we capture the var box
// to i and that is task isolated. This is the region isolation version
// of the 'escaping closure captures non-escaping parameter' error.
//
// TODO: When we have isolation history, isolation history will be able
// to tell us what is going on.
group.addTask { [submitted,i] in // expected-error {{escaping closure captures non-escaping parameter 'transform'}}
let _ = try await transform(self[i]) // expected-note {{captured here}}
let value: T? = nil
return SendableTuple2(submitted, value!)
}
submitted += 1
formIndex(after: &i)
}
// submit first initial tasks
for _ in 0..<parallelism {
try await submitNext()
}
while let tuple = try await group.next() {
let index = tuple.first
let taskResult = tuple.second
result[index] = taskResult
try Task.checkCancellation()
try await submitNext()
}
assert(result.count == n)
return Array(result)
}
}
}
|