File: TaskScheduler.swift

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (565 lines) | stat: -rw-r--r-- 26,251 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

import Foundation
import LSPLogging
import SKSupport
import SwiftExtensions

/// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)``
public enum TaskDependencyAction<TaskDescription: TaskDescriptionProtocol> {
  case waitAndElevatePriorityOfDependency(TaskDescription)
  case cancelAndRescheduleDependency(TaskDescription)
}

private let taskSchedulerSubsystem = "org.swift.sourcekit-lsp.task-scheduler"

public protocol TaskDescriptionProtocol: Identifiable, Sendable, CustomLogStringConvertible {
  /// Execute the task.
  ///
  ///  - Important: This should only be called from `TaskScheduler` and never be called manually.
  func execute() async

  /// When a new task is picked for execution, this determines how the task should behave with respect to the tasks that
  /// are already running.
  ///
  /// Options are the following (see doc comment on `TaskScheduler` for examples):
  ///  1. Not add any `TaskDependencyAction` for a currently executing task. This means that the two tasks can run in
  ///     parallel.
  ///  2. Declare a `waitAndElevatePriorityOfDependency` dependency. This will prevent execution of this task until
  ///     the other task has finished executing. It will elevate the priority of the dependency to the same priority as
  ///     this task. This ensures that we don't get into a priority inversion problem where a high-priority task is
  ///     waiting for a low-priority task.
  ///  3. Declare a `cancelAndRescheduleDependency`. If the task dependency is idempotent and has a priority that's not
  ///     higher than the this task's priority, this causes the task dependency to be cancelled, so that this task can
  ///     execute. The canceled task will be scheduled to re-run at a later point.
  ///     - Declaring a `cancelAndRescheduleDependency` dependency on a task that is not idempotent will change the
  ///       dependency to a `waitAndElevatePriorityOfDependency` dependency and log a fault.
  ///       A `cancelAndRescheduleDependency` dependency should never be emitted for a task that's not idempotent.
  ///     - If the task that should be canceled and re-scheduled has a higher priority than this task, the
  ///       `waitAndElevatePriorityOfDependency` dependency is changed to a `waitAndElevatePriorityOfDependency`
  ///       dependency. This is done to ensure that low-priority tasks can't interfere with the execution of
  ///       high-priority tasks.
  ///     - **Important**: The task that is canceled to be rescheduled must depend on this task, otherwise the two tasks
  ///       will fight each other for execution priority.
  func dependencies(to currentlyExecutingTasks: [Self]) -> [TaskDependencyAction<Self>]

  /// Whether executing this task twice produces the same results.
  ///
  /// This is required for the task to be canceled and re-scheduled (`TaskDependencyAction.cancelAndRescheduleDependency`)
  ///
  /// Tasks that are not idempotent should never be cancelled and rescheduled in the first place. This variable is just
  /// a safety net in case non-idempotent tasks are cancelled and rescheduled. It also ensures that tasks conforming to
  /// `TaskDescriptionProtocol` think about idempotency.
  var isIdempotent: Bool { get }

  /// The number of CPU cores this task is expected to use.
  ///
  /// If the `TaskScheduler` only allows 4 concurrent tasks and a task has `estimatedCPUCoreCount == 4`, this means that
  /// no other tasks will be scheduled while this task is executing. Note that the `TaskScheduler` might over-subscribe
  /// itself to start executing this task though, ie. it only needs to have one available execution slot even if this
  /// task will use 4 CPU cores. This ensures that we get to schedule a 4-core high-priority task in a 4 core scheduler
  /// if there are 100 low-priority 1-core tasks in the queue. Otherwise we would just keep executing those whenever a
  /// slot opens up and only have enough available slots to execute the 4-core high-priority task when all the
  /// low-priority tasks are done.
  ///
  /// For example, this is used by preparation tasks that are known to prepare multiple targets (or source files within
  /// one target) in parallel.
  var estimatedCPUCoreCount: Int { get }
}

/// Parameter that's passed to `executionStateChangedCallback` to indicate the new state of a scheduled task.
public enum TaskExecutionState {
  /// The task started executing.
  case executing

  /// The task was cancelled and will be re-scheduled for execution later. Will be followed by another call with
  /// `executing`.
  case cancelledToBeRescheduled

  /// The task has finished executing. Now more state updates will come after this one.
  case finished
}

public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
  /// Result of `executionTask` / the tasks in `executionTaskCreatedContinuation`.
  /// See doc comment on `executionTask`.
  enum ExecutionTaskFinishStatus {
    case terminated
    case cancelledToBeRescheduled
  }

  /// The `TaskDescription` that defines what the queued task does.
  ///
  /// This is also used to determine dependencies between running tasks.
  nonisolated let description: TaskDescription

  /// The `Task` that produces the actual result of the `QueuedTask`. This is the task that is visible to clients.
  ///
  /// See initialization of this task to see how it works.
  ///
  /// - Note: Implicitly unwrapped optional so the task's closure can access `self`.
  /// - Note: `nonisolated(unsafe)` is fine because it will never get modified after being set in the initializer.
  nonisolated(unsafe) private(set) var resultTask: Task<Void, Never>! = nil

  /// After `execute` is called, the `executionTask` is a task that performs the computation defined by
  /// `description.execute`.
  ///
  /// The `resultTask` effectively waits for this task to be set (by watching for new values produced by
  /// `executionTaskCreatedContinuation`) and awaits its result. The task can terminate with two different statuses:
  ///  - `terminated`: The task has finished executing and the `resultTask` is done.
  ///  - `cancelledToBeRescheduled`: The `executionTask` was cancelled by calling `QueuedTask.cancelToBeRescheduled()`.
  ///    In this case the `TaskScheduler` is expected to call `execute` again, which will produce a new
  ///    `executionTask`. `resultTask` then awaits the creation of the new `executionTask` and then the result of that
  ///    `executionTask`.
  private var executionTask: Task<ExecutionTaskFinishStatus, Never>?

  /// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`.
  private let executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation

  private let _priority: AtomicUInt8

  /// The latest known priority of the task.
  ///
  /// This starts off as the priority with which the task is being created. If higher priority tasks start depending on
  /// it, the priority may get elevated.
  nonisolated var priority: TaskPriority {
    get {
      TaskPriority(rawValue: _priority.value)
    }
    set {
      _priority.value = newValue.rawValue
    }
  }

  /// Whether `cancelToBeRescheduled` has been called on this `QueuedTask`.
  ///
  /// Gets reset every time `executionTask` finishes.
  private var cancelledToBeRescheduled: Bool = false

  /// Whether `resultTask` has been cancelled.
  private let resultTaskCancelled: AtomicBool = .init(initialValue: false)

  private let _isExecuting: AtomicBool = .init(initialValue: false)

  /// Whether the task is currently executing or still queued to be executed later.
  public nonisolated var isExecuting: Bool {
    return _isExecuting.value
  }

  public nonisolated func cancel() {
    resultTask.cancel()
  }

  /// Wait for the task to finish.
  ///
  /// If the tasks that waits for this queued task to finished is cancelled, the QueuedTask will still continue
  /// executing.
  public func waitToFinish() async {
    return await resultTask.value
  }

  /// Wait for the task to finish.
  ///
  /// If the tasks that waits for this queued task to finished is cancelled, the QueuedTask will also be cancelled.
  /// This assumes that the caller of this method has unique control over the task and is the only one interested in its
  /// value.
  public func waitToFinishPropagatingCancellation() async {
    return await resultTask.valuePropagatingCancellation
  }

  /// A callback that will be called when the task starts executing, is cancelled to be rescheduled, or when it finishes
  /// execution.
  private let executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?

  init(
    priority: TaskPriority,
    description: TaskDescription,
    executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?
  ) async {
    self._priority = AtomicUInt8(initialValue: priority.rawValue)
    self.description = description
    self.executionStateChangedCallback = executionStateChangedCallback

    var executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation!
    let executionTaskCreatedStream = AsyncStream {
      executionTaskCreatedContinuation = $0
    }
    self.executionTaskCreatedContinuation = executionTaskCreatedContinuation

    self.resultTask = Task.detached(priority: priority) {
      await withTaskCancellationHandler {
        await withTaskPriorityChangedHandler(initialPriority: self.priority) {
          for await task in executionTaskCreatedStream {
            switch await task.valuePropagatingCancellation {
            case .cancelledToBeRescheduled:
              // Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
              break
            case .terminated:
              // The task finished. We are done with this `QueuedTask`
              return
            }
          }
        } taskPriorityChanged: {
          withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
            logger.debug(
              "Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)"
            )
          }
          self.priority = Task.currentPriority
        }
      } onCancel: {
        self.resultTaskCancelled.value = true
      }
    }
  }

  /// Start executing the task.
  ///
  /// Execution might be canceled to be rescheduled, in which case this returns  `.cancelledToBeRescheduled`. In that
  /// case the `TaskScheduler` is expected to call `execute` again.
  func execute() async -> ExecutionTaskFinishStatus {
    if cancelledToBeRescheduled {
      // `QueuedTask.execute` is called from a detached task in `TaskScheduler.poke` but we insert it into the
      // `currentlyExecutingTasks` queue beforehand. This leaves a short windows in which we could cancel the task to
      // reschedule it before it actually starts executing.
      // If this happens, we don't have to do anything in `execute` and can immediately return. `execute` will be called
      // again when the task gets rescheduled.
      cancelledToBeRescheduled = false
      return .cancelledToBeRescheduled
    }
    precondition(executionTask == nil, "Task started twice")
    let task = Task.detached(priority: self.priority) {
      if !Task.isCancelled && !self.resultTaskCancelled.value {
        await self.description.execute()
      }
      return await self.finalizeExecution()
    }
    executionTask = task
    executionTaskCreatedContinuation.yield(task)
    _isExecuting.value = true
    await executionStateChangedCallback?(self, .executing)
    return await task.value
  }

  /// Implementation detail of `execute` that is called after `self.description.execute()` finishes.
  private func finalizeExecution() async -> ExecutionTaskFinishStatus {
    self.executionTask = nil
    _isExecuting.value = false
    if Task.isCancelled && self.cancelledToBeRescheduled {
      await executionStateChangedCallback?(self, .cancelledToBeRescheduled)
      self.cancelledToBeRescheduled = false
      return ExecutionTaskFinishStatus.cancelledToBeRescheduled
    } else {
      await executionStateChangedCallback?(self, .finished)
      return ExecutionTaskFinishStatus.terminated
    }
  }

  /// Cancel the task to be rescheduled later.
  ///
  /// If the task has not been started yet or has already finished execution, this is a no-op.
  func cancelToBeRescheduled() {
    self.cancelledToBeRescheduled = true
    guard let executionTask else {
      return
    }
    executionTask.cancel()
    self.executionTask = nil
  }

  /// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning
  /// a new task that depends on it. Otherwise a no-op.
  nonisolated func elevatePriority(to targetPriority: TaskPriority) {
    if priority < targetPriority {
      withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
        logger.debug(
          "Elevating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(targetPriority.rawValue)"
        )
      }
      // Awaiting the result task from a higher-priority task will eventually update `priority` through
      // `withTaskPriorityChangedHandler` but that might take a while because `withTaskPriorityChangedHandler` polls.
      // Since we know that the priority will be elevated, set it now. That way we don't try to elevate it again.
      self.priority = targetPriority
      Task(priority: targetPriority) {
        await self.resultTask.value
      }
    }
  }
}

/// Schedules an unordered list of tasks for execution.
///
/// The key features that `TaskScheduler` provides are:
///  - It allows the dynamic declaration of dependencies between tasks. A task can declare whether it can be executed
///    based on which other tasks are currently running. For example, this allows us to guarantee that only a single
///    preparation task is running at a time without enforcing any order in which the preparation tasks should run.
///  - It allows the maximum number of tasks to be limited at a given priority. This allows us to eg. only use half the
///    computer's cores for background indexing and using all cores if user interaction is depending on a set of files
///    being indexed without over-subscribing the CPU.
///  - It allows tasks to be canceled and rescheduled to make room for tasks that are faster to execute. For example,
///    this is used when we have a joint background index task for file `A`, `B` and `C` (which might be in the same
///    target) with low priority. We now request to index `A` with high priority separately because it's needed for user
///    interaction. This cancels the joint indexing of `A`, `B` and `C` so that `A` can be indexed as a standalone file
///    as quickly as possible. The joint indexing of `A`, `B` and `C` is then re-scheduled (again at low priority) and
///    will depend on `A` being indexed.
public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
  /// The tasks that are currently being executed.
  ///
  /// All tasks in this queue are guaranteed to trigger a call `poke` again once they finish. Thus, whenever there are
  /// items left in this array, we are guaranteed to get another call to `poke`
  private var currentlyExecutingTasks: [QueuedTask<TaskDescription>] = []

  /// The queue of pending tasks that haven't been scheduled for execution yet.
  private var pendingTasks: [QueuedTask<TaskDescription>] = []

  /// An ordered list of task priorities to the number of tasks that might execute concurrently at that (or a higher)
  /// priority.
  ///
  /// This list is sorted in descending priority order.
  ///
  /// The `maxConcurrentTasks` of the last element in this list is also used for tasks with a lower priority.
  ///
  /// For example if you have
  /// ```swift
  /// [
  ///   (.medium, 4),
  ///   (.low, 2)
  /// ]
  /// ```
  ///
  /// Then we allow the following number of concurrent tasks at the following priorities
  ///  - `.high`: 4
  ///  - `.medium`: 4
  ///  - `.low`: 2
  ///  - `.background`: 2
  private let maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)]

  public init(maxConcurrentTasksByPriority: [(priority: TaskPriority, maxConcurrentTasks: Int)]) {
    self.maxConcurrentTasksByPriority = maxConcurrentTasksByPriority.sorted(by: { $0.priority > $1.priority })
    precondition(maxConcurrentTasksByPriority.map(\.maxConcurrentTasks).isSorted(descending: true))
    precondition(!maxConcurrentTasksByPriority.isEmpty)
    precondition(maxConcurrentTasksByPriority.last!.maxConcurrentTasks >= 1)
  }

  /// Enqueue a new task to be executed.
  ///
  /// - Important: A task that is scheduled by `TaskScheduler` must never be awaited from a task that runs on
  ///   `TaskScheduler`. Otherwise we might end up in deadlocks, eg. if the inner task cannot be scheduled because the
  ///   outer task is claiming all execution slots in the `TaskScheduler`.
  @discardableResult
  public func schedule(
    priority: TaskPriority? = nil,
    _ taskDescription: TaskDescription,
    @_inheritActorContext executionStateChangedCallback: (
      @Sendable (QueuedTask<TaskDescription>, TaskExecutionState) async -> Void
    )? = nil
  ) async -> QueuedTask<TaskDescription> {
    let queuedTask = await QueuedTask(
      priority: priority ?? Task.currentPriority,
      description: taskDescription,
      executionStateChangedCallback: executionStateChangedCallback
    )
    pendingTasks.append(queuedTask)
    Task.detached(priority: priority ?? Task.currentPriority) {
      // Poke the `TaskScheduler` to execute a new task. If the `TaskScheduler` is already working at its capacity
      // limit, this will not do anything. If there are execution slots available, this will start executing the freshly
      // queued task.
      await self.poke()
    }
    return queuedTask
  }

  /// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
  private func maxConcurrentTasks(at priority: TaskPriority) -> Int {
    for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
      if atPriority <= priority {
        return maxConcurrentTasks
      }
    }
    // `last!` is fine because the initializer of `maxConcurrentTasksByPriority` has a precondition that
    // `maxConcurrentTasksByPriority` is not empty.
    return maxConcurrentTasksByPriority.last!.maxConcurrentTasks
  }

  /// Poke the execution of more tasks in the queue.
  ///
  /// This will continue calling itself until the queue is empty.
  private func poke() {
    pendingTasks.sort(by: { $0.priority > $1.priority })
    for task in pendingTasks {
      if currentlyExecutingTasks.map(\.description.estimatedCPUCoreCount).sum() >= maxConcurrentTasks(at: task.priority)
      {
        // We don't have any execution slots left. Thus, this poker has nothing to do and is done.
        // When the next task finishes, it calls `poke` again.
        // If the low priority task's priority gets elevated that task's priority will get elevated and it will be
        // picked up on the next `poke` call.
        return
      }
      let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description))
      let waitForTasks = dependencies.compactMap { (taskDependency) -> QueuedTask<TaskDescription>? in
        switch taskDependency {
        case .cancelAndRescheduleDependency(let taskDescription):
          guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
          else {
            withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
              logger.fault(
                "Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks"
              )
            }
            return nil
          }
          if !taskDescription.isIdempotent {
            withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
              logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent")
            }
            return dependency
          }
          if dependency.priority > task.priority {
            // Don't reschedule tasks that are more important than the new task we would like to schedule.
            return dependency
          }
          return nil
        case .waitAndElevatePriorityOfDependency(let taskDescription):
          guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
          else {
            withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
              logger.fault(
                "Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks"
              )
            }
            return nil
          }
          return dependency
        }
      }
      if !waitForTasks.isEmpty {
        // This task is blocked by a task that's currently executing. Elevate the priorities of those tasks and continue
        // looking in the queue if there is another task we can execute.
        for waitForTask in waitForTasks {
          waitForTask.elevatePriority(to: task.priority)
        }
        continue
      }
      let rescheduleTasks = dependencies.compactMap { (taskDependency) -> QueuedTask<TaskDescription>? in
        switch taskDependency {
        case .cancelAndRescheduleDependency(let taskDescription):
          guard let task = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else {
            withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
              logger.fault(
                "Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks"
              )
            }
            return nil
          }
          return task
        default:
          return nil
        }
      }
      if !rescheduleTasks.isEmpty {
        Task.detached(priority: task.priority) {
          for task in rescheduleTasks {
            withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
              logger.debug("Suspending \(task.description.forLogging)")
            }
            await task.cancelToBeRescheduled()
          }
        }
        // Don't go looking for other tasks to execute in this poker because we should be waiting for the rescheduled
        // tasks to finish (which will call `poke` again), and then actually schedule `task`.
        // If we did enqueue another task from the pending queue, that new task might introduce a new dependency `task`,
        // which could delay its execution and render the suspension of previous tasks useless.
        return
      }

      currentlyExecutingTasks.append(task)
      pendingTasks.removeAll(where: { $0 === task })
      Task.detached(priority: task.priority) {
        // Await the task's return in a task so that this poker can continue checking if there are more execution
        // slots that can be filled with queued tasks.
        let finishStatus = await task.execute()
        await self.finalizeTaskExecution(task: task, finishStatus: finishStatus)
      }
    }
  }

  /// Implementation detail of `poke` to be called after `task.execute()` to ensure that `task.execute()` executes in
  /// a different isolation domain then `TaskScheduler`.
  private func finalizeTaskExecution(
    task: QueuedTask<TaskDescription>,
    finishStatus: QueuedTask<TaskDescription>.ExecutionTaskFinishStatus
  ) async {
    currentlyExecutingTasks.removeAll(where: { $0.description.id == task.description.id })
    switch finishStatus {
    case .terminated: break
    case .cancelledToBeRescheduled: pendingTasks.append(task)
    }
    self.poke()
  }
}

extension TaskScheduler {
  @_spi(Testing)
  public static var forTesting: TaskScheduler {
    return .init(maxConcurrentTasksByPriority: [
      (.low, ProcessInfo.processInfo.processorCount)
    ])
  }
}

// MARK: - Collection utilities

fileprivate extension Collection where Element: Comparable {
  func isSorted(descending: Bool) -> Bool {
    var previous = self.first
    for element in self {
      if (previous! < element) == descending {
        return false
      }
      previous = element
    }
    return true
  }
}

fileprivate extension Collection<Int> {
  func sum() -> Int {
    var result = 0
    for element in self {
      result += element
    }
    return result
  }
}

/// Version of the `withTaskPriorityChangedHandler` where the body doesn't throw.
fileprivate func withTaskPriorityChangedHandler(
  initialPriority: TaskPriority = Task.currentPriority,
  pollingInterval: Duration = .seconds(0.1),
  @_inheritActorContext operation: @escaping @Sendable () async -> Void,
  taskPriorityChanged: @escaping @Sendable () -> Void
) async {
  do {
    try await withTaskPriorityChangedHandler(
      initialPriority: initialPriority,
      pollingInterval: pollingInterval,
      operation: operation as @Sendable () async throws -> Void,
      taskPriorityChanged: taskPriorityChanged
    )
  } catch is CancellationError {
  } catch {
    // Since `operation` does not throw, the only error we expect `withTaskPriorityChangedHandler` to throw is a
    // `CancellationError`, in which case we can just return.
    logger.fault("Unexpected error thrown from withTaskPriorityChangedHandler: \(error.forLogging)")
  }
}