File: Task.cpp

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (302 lines) | stat: -rw-r--r-- 12,819 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#include "Concurrency/Task.h"

#include "Concurrency/TaskPrivate.h"
#include "Concurrency/Error.h"
#include "Overrides.h"

using namespace swift;
using FutureFragment = AsyncTask::FutureFragment;
using TaskGroup = swift::TaskGroup;

Metadata swift::TaskAllocatorSlabMetadata;

FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
                                             AsyncContext *waitingTaskContext,
                                             TaskContinuationFunction *resumeFn,
                                             AsyncContext *callerContext,
                                             OpaqueValue *result) {
  SWIFT_TASK_DEBUG_LOG("compat 56 task task %p", this);
  using Status = FutureFragment::Status;
  using WaitQueueItem = FutureFragment::WaitQueueItem;

  assert(isFuture());
  auto fragment = futureFragment();

  auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
  bool contextIntialized = false;
  auto escalatedPriority = JobPriority::Unspecified;
  while (true) {
    switch (queueHead.getStatus()) {
    case Status::Error:
    case Status::Success:
      SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, completed immediately",
                           waitingTask, this);
      _swift_tsan_acquire(static_cast<Job *>(this));
      if (contextIntialized) waitingTask->flagAsRunning();
      // The task is done; we don't need to wait.
      return queueHead.getStatus();

    case Status::Executing:
      SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, going to sleep",
                           waitingTask, this);
      _swift_tsan_release(static_cast<Job *>(waitingTask));
      // Task is not complete. We'll need to add ourselves to the queue.
      break;
    }

    if (!contextIntialized) {
      contextIntialized = true;
      auto context =
          reinterpret_cast<TaskFutureWaitAsyncContext *>(waitingTaskContext);
      context->errorResult = nullptr;
      context->successResultPointer = result;
      context->ResumeParent = resumeFn;
      context->Parent = callerContext;
      waitingTask->flagAsSuspended();
    }

    // Escalate the blocking task to the priority of the waiting task.
    // FIXME: Also record that the waiting task is now waiting on the
    // blocking task so that escalators of the waiting task can propagate
    // the escalation to the blocking task.
    //
    // Recording this dependency is tricky because we need escalators
    // to be able to escalate without worrying about the blocking task
    // concurrently finishing, resuming the escalated task, and being
    // invalidated.  So we're not doing that yet.  In the meantime, we
    // do the best-effort alternative of escalating the blocking task
    // as a one-time deal to the current priority of the waiting task.
    // If the waiting task is escalated after this point, the priority
    // will not be escalated, but that's inevitable in the absence of
    // propagation during escalation.
    //
    // We have to do the escalation before we successfully enqueue the
    // waiting task on the blocking task's wait queue, because as soon as
    // we do, this thread is no longer blocking the resumption of the
    // waiting task, and so both the blocking task (which is retained
    // during the wait only from the waiting task's perspective) and the
    // waiting task (which can simply terminate) must be treat as
    // invalidated from this thread's perspective.
    //
    // When we do fix this bug to record the dependency, we will have to
    // do it before this escalation of the blocking task so that there
    // isn't a race where an escalation of the waiting task can fail
    // to propagate to the blocking task.  The correct priority to
    // escalate to is the priority we observe when we successfully record
    // the dependency; any later escalations will automatically propagate.
    //
    // If the blocking task finishes while we're doing this escalation,
    // the escalation will be innocuous.  The wasted effort is acceptable;
    // programmers should be encouraged to give tasks that will block
    // other tasks the correct priority to begin with.
    auto waitingStatus =
      waitingTask->_private().Status.load(std::memory_order_relaxed);
    if (waitingStatus.getStoredPriority() > escalatedPriority) {
      swift_task_escalateBackdeploy56(this, waitingStatus.getStoredPriority());
      escalatedPriority = waitingStatus.getStoredPriority();
    }

    // Put the waiting task at the beginning of the wait queue.
    waitingTask->getNextWaitingTask() = queueHead.getTask();
    auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
    if (fragment->waitQueue.compare_exchange_weak(
            queueHead, newQueueHead,
            /*success*/ std::memory_order_release,
            /*failure*/ std::memory_order_acquire)) {
      _swift_task_clearCurrent();
      return FutureFragment::Status::Executing;
    }
  }
}

//===--- swift_task_future_wait -------------------------------------------===//

SWIFT_CC(swiftasync)
static void
task_future_wait_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *_context) {
  return _context->ResumeParent(_context->Parent);
}


#ifdef __ARM_ARCH_7K__
__attribute__((noinline))
SWIFT_CC(swiftasync) static void workaround_function_swift_task_future_waitImpl(
    OpaqueValue *result, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
    AsyncTask *task, TaskContinuationFunction resumeFunction,
    AsyncContext *callContext) {
  // Make sure we don't eliminate calls to this function.
  asm volatile("" // Do nothing.
               :  // Output list, empty.
               : "r"(result), "r"(callerContext), "r"(task) // Input list.
               : // Clobber list, empty.
  );
  return;
}
#endif

void SWIFT_CC(swiftasync) swift::swift56override_swift_task_future_wait(
                                            OpaqueValue *result,
                                            SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
                                            AsyncTask *task,
                                            TaskContinuationFunction *resumeFn,
                                            AsyncContext *callContext,
                                            TaskFutureWait_t *original) {
  // Suspend the waiting task.
  auto waitingTask = swift_task_getCurrent();
  waitingTask->ResumeTask = task_future_wait_resume_adapter;
  waitingTask->ResumeContext = callContext;

  // Wait on the future.
  assert(task->isFuture());

  switch (task->waitFuture(waitingTask, callContext, resumeFn, callerContext,
                           result)) {
  case FutureFragment::Status::Executing:
    // The waiting task has been queued on the future.
#ifdef __ARM_ARCH_7K__
    return workaround_function_swift_task_future_waitImpl(
        result, callerContext, task, resumeFn, callContext);
#else
    return;
#endif

  case FutureFragment::Status::Success: {
    // Run the task with a successful result.
    auto future = task->futureFragment();
    future->getResultType()->vw_initializeWithCopy(result,
                                                   future->getStoragePtr());
    return resumeFn(callerContext);
  }

  case FutureFragment::Status::Error:
    swift_Concurrency_fatalError(0, "future reported an error, but wait cannot throw");
  }
}

//===--- swift_task_future_wait_throwing ----------------------------------===//

SWIFT_CC(swiftasync)
static void task_wait_throwing_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *_context) {

  auto context = static_cast<TaskFutureWaitAsyncContext *>(_context);
  auto resumeWithError =
      reinterpret_cast<AsyncVoidClosureEntryPoint *>(context->ResumeParent);
  return resumeWithError(context->Parent, context->errorResult);
}

#ifdef __ARM_ARCH_7K__
__attribute__((noinline))
SWIFT_CC(swiftasync) static void workaround_function_swift_task_future_wait_throwingImpl(
    OpaqueValue *result, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
    AsyncTask *task, ThrowingTaskFutureWaitContinuationFunction resumeFunction,
    AsyncContext *callContext) {
  // Make sure we don't eliminate calls to this function.
  asm volatile("" // Do nothing.
               :  // Output list, empty.
               : "r"(result), "r"(callerContext), "r"(task) // Input list.
               : // Clobber list, empty.
  );
  return;
}
#endif

void SWIFT_CC(swiftasync) swift::swift56override_swift_task_future_wait_throwing(
                                            OpaqueValue *result,
                                            SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
                                            AsyncTask *task,
                                            ThrowingTaskFutureWaitContinuationFunction *resumeFunction,
                                            AsyncContext *callContext,
                                            TaskFutureWaitThrowing_t *original) {
  auto waitingTask = swift_task_getCurrent();
  // Suspend the waiting task.
  waitingTask->ResumeTask = task_wait_throwing_resume_adapter;
  waitingTask->ResumeContext = callContext;

  auto resumeFn = reinterpret_cast<TaskContinuationFunction *>(resumeFunction);

  // Wait on the future.
  assert(task->isFuture());

  switch (task->waitFuture(waitingTask, callContext, resumeFn, callerContext,
                           result)) {
  case FutureFragment::Status::Executing:
    // The waiting task has been queued on the future.
#ifdef __ARM_ARCH_7K__
    return workaround_function_swift_task_future_wait_throwingImpl(
        result, callerContext, task, resumeFunction, callContext);
#else
    return;
#endif

  case FutureFragment::Status::Success: {
    auto future = task->futureFragment();
    future->getResultType()->vw_initializeWithCopy(result,
                                                   future->getStoragePtr());
    return resumeFunction(callerContext, nullptr /*error*/);
  }

  case FutureFragment::Status::Error: {
    // Run the task with an error result.
    auto future = task->futureFragment();
    auto error = future->getError();
    swift_errorRetain(error);
    return resumeFunction(callerContext, error);
  }
  }
}

//===--- swift_task_create_common -----------------------------------------===//

// NOTE: this function is currently only installed as an override on
// 64-bit targets.  The fix in it has been written to work correctly
// on any target, though, so if you need to use it for a more general
// fix, you should be able to just define and install it unconditionally.
#if __POINTER_WIDTH__ == 64

AsyncTaskAndContext SWIFT_CC(swift)
swift::swift56override_swift_task_create_common(
    size_t rawTaskCreateFlags,
    TaskOptionRecord *options,
    const Metadata *futureResultType,
    TaskContinuationFunction *function, void *closureContext,
    size_t initialContextSize,
    TaskCreateCommon_t *original) {

  // The <=5.6 versions of this function pointlessly initialize the
  // defunct Flags field in the initial context.  This initialization
  // is mostly harmless because the initial function has no expectations
  // about the non-header contents of the initial context on entry.
  // However, if the initial context doesn't include space for the Flags
  // field, and it ends up at the end of an allocation, this write can
  // go past the end of the allocation.
  //
  // The initial context is always at the end of the allocation for
  // Tasks that lack a preallocated buffer, i.e. any Task that is not
  // an async let.
  //
  // On 32-bit targets, the Flags field was at offset 8.  Since context
  // sizes are always rounded up to a multiple of MaximumAlignment,
  // initialContextSize is guaranteed to be >= 16, so the store to
  // Flags will always fall within it.  On 64-bit targets, however,
  // Flags was at offset 16.  We therefore need to ensure the initial
  // context is large enough for the unnecessary write to Flags.
  //
  // We could handle this in the compiler by ensuring that all
  // functions request at least 32 bytes of context, but that would
  // introduce a permanent overhead on thunks and other functions that
  // don't need any temporary scratch space.  We really only need to work
  // around this one store when creating tasks, and fortunately, that
  // always flows through this one function.  Since this hook receives
  // the initial function and context size directly instead of as an
  // async function pointer, it's painless for us to just change the
  // requested initial context size.
#if __POINTER_WIDTH__ == 64
  if (initialContextSize < 32) initialContextSize = 32;
#endif

  return original(rawTaskCreateFlags, options,
                  futureResultType, function, closureContext,
                  initialContextSize);
}

#endif