1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
|
// RUN: %empty-directory(%t)
// RUN: %target-build-swift %s -Xfrontend -disable-availability-checking -parse-as-library -o %t/async_task_priority
// RUN: %target-codesign %t/async_task_priority
// RUN: %target-run %t/async_task_priority
// REQUIRES: VENDOR=apple
// REQUIRES: executable_test
// REQUIRES: concurrency
// REQUIRES: libdispatch
// rdar://76038845
// REQUIRES: concurrency_runtime
// UNSUPPORTED: back_deployment_runtime
// UNSUPPORTED: back_deploy_concurrency
// rdar://101077408 – Temporarily disable on watchOS & iOS simulator
// UNSUPPORTED: DARWIN_SIMULATOR=watchos
// UNSUPPORTED: DARWIN_SIMULATOR=ios
// UNSUPPORTED: DARWIN_SIMULATOR=tvos
// rdar://107390341 - Temporarily disable for arm64e
// UNSUPPORTED: CPU=arm64e
import Darwin
@preconcurrency import Dispatch
import StdlibUnittest
func loopUntil(priority: TaskPriority) async {
var currentPriority = Task.currentPriority
while (currentPriority != priority) {
print("Current priority = \(currentPriority) != \(priority)")
await Task.sleep(1_000_000)
currentPriority = Task.currentPriority
}
}
func print(_ s: String = "") {
fputs("\(s)\n", stderr)
}
func expectedBasePri(priority: TaskPriority) -> TaskPriority {
let basePri = Task.basePriority!
print("Testing basePri matching expected pri - \(basePri) == \(priority)")
expectEqual(basePri, priority)
withUnsafeCurrentTask { unsafeTask in
guard let unsafeTask else {
fatalError("Expected to be able to get current task, but could not!")
}
// The UnsafeCurrentTask must return the same value
expectEqual(basePri, unsafeTask.basePriority)
}
return basePri
}
func expectedEscalatedPri(priority: TaskPriority) -> TaskPriority {
let curPri = Task.currentPriority
print("Testing escalated matching expected pri - \(curPri) == \(priority)")
expectEqual(curPri, priority)
return curPri
}
func testNestedTaskPriority(basePri: TaskPriority, curPri: TaskPriority) async {
let _ = expectedBasePri(priority: basePri)
let _ = expectedEscalatedPri(priority: curPri)
}
func childTaskWaitingForEscalation(sem: DispatchSemaphore, basePri: TaskPriority, curPri : TaskPriority) async {
sem.wait() /* Wait to be escalated */
let _ = await testNestedTaskPriority(basePri: basePri, curPri: curPri)
}
actor Test {
private var value = 0
init() { }
func increment() -> Int {
let cur = value
value = value + 1
return cur
}
func blockActorThenIncrement(semToSignal: DispatchSemaphore, semToWait : DispatchSemaphore, priExpected: TaskPriority) -> Int {
semToSignal.signal()
semToWait.wait();
sleep(1)
// TODO: insert a test to verify that thread priority has actually escalated
// to match priExpected
return increment()
}
}
@main struct Main {
static func main() async {
let top_level = detach { /* To detach from main actor when running work */
let tests = TestSuite("Task Priority manipulations")
if #available(SwiftStdlib 5.1, *) {
tests.test("Basic escalation test when task is running") {
let parentPri = Task.currentPriority
let sem = DispatchSemaphore(value: 0)
let task = Task(priority: .background) {
let _ = expectedBasePri(priority: .background)
// Wait until task is running before asking to be escalated
sem.signal()
sleep(1)
let _ = expectedEscalatedPri(priority: parentPri)
}
// Wait till child runs and asks to be escalated
sem.wait()
await task.value
}
tests.test("Basic escalation when task is suspended") {
let parentPri = Task.currentPriority
let task = Task(priority: .background) {
await loopUntil(priority: parentPri) /* Suspend task until it is escalated */
let _ = expectedEscalatedPri(priority: parentPri)
}
await task.value // Escalate task BG -> DEF
}
tests.test("Structured concurrency priority propagation") {
let task = Task(priority: .background) {
await loopUntil(priority: .default)
let basePri = expectedBasePri(priority: .background)
let curPri = expectedEscalatedPri(priority: .default)
// Structured concurrency via async let, escalated priority of
// parent should propagate
print("Testing propagation for async let structured concurrency child")
async let child = testNestedTaskPriority(basePri: basePri, curPri: curPri)
await child
let dispatchGroup = DispatchGroup()
// Structured concurrency via task groups, escalated priority should
// propagate
await withTaskGroup(of: Void.self, returning: Void.self) { group in
dispatchGroup.enter()
group.addTask {
print("Testing propagation for task group regular child")
let _ = await testNestedTaskPriority(basePri: basePri, curPri: curPri)
dispatchGroup.leave()
return
}
dispatchGroup.enter()
group.addTask(priority: .utility) {
print("Testing propagation for task group child with specified priority")
let _ = await testNestedTaskPriority(basePri: .utility, curPri: curPri)
dispatchGroup.leave()
return
}
// Wait for child tasks to finish running, don't await since that
// will escalate them
dispatchGroup.wait()
}
}
await task.value // Escalate task BG->DEF
}
tests.test("Unstructured tasks priority propagation") {
let task = Task(priority : .background) {
await loopUntil(priority: .default)
let basePri = expectedBasePri(priority: .background)
let _ = expectedEscalatedPri(priority: .default)
let group = DispatchGroup()
// Create an unstructured task
group.enter()
let _ = Task {
let _ = await testNestedTaskPriority(basePri: basePri, curPri: basePri)
group.leave()
return
}
// Wait for unstructured task to finish running, don't await it
// since that will escalate
group.wait()
}
await task.value // Escalate task BG->DEF
}
tests.test("Task escalation propagation to SC children") {
// Create a task tree and then escalate the parent
let parentPri = Task.currentPriority
let basePri : TaskPriority = .background
let sem = DispatchSemaphore(value: 0)
let sem2 = DispatchSemaphore(value : 0)
let task = Task(priority: basePri) {
async let child = childTaskWaitingForEscalation(sem: sem2, basePri: basePri, curPri: parentPri)
await withTaskGroup(of: Void.self, returning: Void.self) { group in
group.addTask {
let _ = await childTaskWaitingForEscalation(sem: sem2, basePri: basePri, curPri: parentPri)
}
group.addTask(priority: .utility) {
let _ = await childTaskWaitingForEscalation(sem: sem2, basePri: .utility, curPri: parentPri)
}
sem.signal() // Ask for escalation after creating full task tree
sleep(1)
let _ = expectedBasePri(priority: basePri)
let _ = expectedEscalatedPri(priority: parentPri)
sem2.signal() // Ask async let child to evaluate
sem2.signal() // Ask task group child 1 to evaluate
sem2.signal() // Ask task group child 2 to evaluate
}
}
// Wait until children are created and then ask for escalation of
// top level
sem.wait()
await task.value
}
tests.test("Simple task escalation to a future") {
let task1Pri: TaskPriority = .background
let task2Pri: TaskPriority = .utility
let parentPri: TaskPriority = Task.currentPriority
print("Top level task current priority = \(parentPri)")
// After task2 has suspended waiting for task1, escalating task2
// should cause task1 to escalate
let task1 = Task(priority: task1Pri) {
// Wait until task2 has blocked on task1 and escalated it
sleep(1)
expectedEscalatedPri(priority: task2Pri)
// Wait until task2 itself has been escalated
sleep(5)
expectedEscalatedPri(priority: parentPri)
}
let task2 = Task(priority: task2Pri) {
await task1.value
}
// Wait for task2 and task1 to run and for task2 to now block on
// task1
sleep(3)
await task2.value
}
tests.test("Simple task escalation to a future 2") {
// top level task -> unstructured task2 -> child task -> unstructured
// task1
let task1Pri: TaskPriority = .background
let task2Pri: TaskPriority = .utility
let parentPri: TaskPriority = Task.currentPriority
print("Top level task current priority = \(parentPri)")
let task1 = Task(priority: task1Pri) {
await loopUntil(priority: parentPri)
}
sleep(1) // Wait for task1 to start running
let task2 = Task(priority: task2Pri) {
func childTask() async {
await task1.value
}
async let child = childTask()
}
sleep(1) // Wait for task2 to start running
await task2.value
}
tests.test("Task escalation of a task enqueued on an actor") {
let task1Pri: TaskPriority = .background
let task2Pri: TaskPriority = .background
let parentPri: TaskPriority = Task.currentPriority
let sem1 = DispatchSemaphore(value: 0) // to unblock enqueue of task2
let sem2 = DispatchSemaphore(value: 0)
let testActor = Test()
let task1 = Task(priority: task1Pri) {
expectedBasePri(priority: task1Pri);
await testActor.blockActorThenIncrement(semToSignal: sem1, semToWait: sem2, priExpected: parentPri);
}
sem1.wait() // Wait until task1 is on the actor
let task2 = Task(priority: task2Pri) {
expectedBasePri(priority: task2Pri);
await testActor.increment()
}
sleep(1)
sem2.signal() // task2 is probably enqueued on the actor at this point, unblock task1
await task2.value // Escalate task2 which should be queued behind task1 on the actor
}
}
await runAllTestsAsync()
}
await top_level.value
}
}
|