1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
//------------------------------------------------------------------------------
// <copyright file="SynchronizationHelper.cs" company="Microsoft">
// Copyright (c) Microsoft Corporation. All rights reserved.
// </copyright>
//------------------------------------------------------------------------------
namespace System.Web.Util {
using System;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
// This class is used by the AspNetSynchronizationContext to assist with scheduling tasks in a non-blocking fashion.
// Asynchronous work will be queued and will execute sequentially, never consuming more than a single thread at a time.
// Synchronous work will block and will execute on the current thread.
internal sealed class SynchronizationHelper {
private Task _completionTask; // the Task that will run when all in-flight operations have completed
private Thread _currentThread; // the Thread that's running the current Task; all threads must see the same value for this field
private Task _lastScheduledTask = CreateInitialTask(); // the last Task that was queued to this helper, used to hook future Tasks (not volatile since always accessed under lock)
private Task _lastScheduledTaskAsync = CreateInitialTask(); // the last async Task that was queued to this helper
private readonly object _lockObj = new object(); // synchronizes access to _lastScheduledTask
private int _operationsInFlight; // operation counter
private readonly ISyncContext _syncContext; // a context that wraps an operation with pre- and post-execution phases
private readonly Action<bool> _appVerifierCallback; // for making sure that developers don't try calling us after the request has completed
public SynchronizationHelper(ISyncContext syncContext) {
_syncContext = syncContext;
_appVerifierCallback = AppVerifier.GetSyncContextCheckDelegate(syncContext);
}
// If an operation results in an exception, this property will provide access to it.
public ExceptionDispatchInfo Error { get; set; }
// Helper to access the _currentThread field in a thread-safe fashion.
// It is not enough to mark the _currentThread field volatile, since that only guarantees
// read / write ordering and doesn't ensure that each thread sees the same value.
private Thread CurrentThread {
get { return Interlocked.CompareExchange(ref _currentThread, null, null); }
set { Interlocked.Exchange(ref _currentThread, value); }
}
// Returns the number of pending operations
public int PendingCount { get { return ChangeOperationCount(0); } }
public int ChangeOperationCount(int addend) {
int newOperationCount = Interlocked.Add(ref _operationsInFlight, addend);
if (newOperationCount == 0) {
// if an asynchronous completion operation is queued, run it
Task completionTask = Interlocked.Exchange(ref _completionTask, null);
if (completionTask != null) {
completionTask.Start();
}
}
return newOperationCount;
}
private void CheckForRequestStateIfRequired(bool checkForReEntry) {
if (_appVerifierCallback != null) {
_appVerifierCallback(checkForReEntry);
}
}
// Creates the initial hook that future operations can ride off of
private static Task CreateInitialTask() {
return Task.FromResult<object>(null);
}
// Takes control of this SynchronizationHelper instance synchronously. Asynchronous operations
// will be queued but will not be dispatched until control is released (by disposing of the
// returned object). This operation might block if a different thread is currently in
// control of the context.
public IDisposable EnterSynchronousControl() {
if (CurrentThread == Thread.CurrentThread) {
// If the current thread already has control of this context, there's nothing extra to do.
return DisposableAction.Empty;
}
// used to mark the end of the synchronous task
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
Task lastTask;
lock (_lockObj) {
lastTask = _lastScheduledTask;
_lastScheduledTask = tcs.Task; // future work can be scheduled off this Task
}
// The original task may end up Faulted, which would make its Wait() method throw an exception.
// To avoid this, we instead wait on a continuation which is always guaranteed to complete successfully.
if (!lastTask.IsCompleted) { lastTask.ContinueWith(_ => { }, TaskContinuationOptions.ExecuteSynchronously).Wait(); }
CurrentThread = Thread.CurrentThread;
// synchronous control is released by marking the Task as complete
return new DisposableAction(() => {
CurrentThread = null;
tcs.TrySetResult(null);
});
}
public void QueueAsynchronous(Action action) {
CheckForRequestStateIfRequired(checkForReEntry: true);
ChangeOperationCount(+1);
// This method only schedules work; it doesn't itself do any work. The lock is held for a very
// short period of time.
lock (_lockObj) {
Task newTask = _lastScheduledTask.ContinueWith(_ => SafeWrapCallback(action), TaskScheduler.Default);
_lastScheduledTask = newTask; // the newly-created task is now the last one
}
}
// QueueAsynchronousAsync and SafeWrapCallbackAsync guarantee:
// 1. For funcs posted here, it's would first come, first complete.
// 2. There is no overlapping execution.
public void QueueAsynchronousAsync(Func<object, Task> func, object state) {
CheckForRequestStateIfRequired(checkForReEntry: true);
ChangeOperationCount(+1);
// This method only schedules work; it doesn't itself do any work. The lock is held for a very
// short period of time.
lock (_lockObj) {
// 1. Note that we are chaining newTask with _lastScheduledTaskAsync, not _lastScheduledTask.
// Chaining newTask with _lastScheduledTask would cause deadlock.
// 2. Unwrap() is necessary to be called here. When chaining multiple tasks using the ContinueWith
// method, your return type will be Task<T> whereas T is the return type of the delegate/method
// passed to ContinueWith. As the return type of an async delegate is a Task, you will end up with
// a Task<Task> and end up waiting for the async delegate to return you the Task which is done after
// the first await.
Task newTask = _lastScheduledTaskAsync.ContinueWith(
async _ => { await SafeWrapCallbackAsync(func, state); }).Unwrap();
_lastScheduledTaskAsync = newTask; // the newly-created task is now the last one
}
}
public void QueueSynchronous(Action action) {
CheckForRequestStateIfRequired(checkForReEntry: false);
if (CurrentThread == Thread.CurrentThread) {
// current thread already owns the context, so just execute inline to prevent deadlocks
action();
return;
}
ChangeOperationCount(+1);
using (EnterSynchronousControl()) {
SafeWrapCallback(action);
}
}
private void SafeWrapCallback(Action action) {
// This method will try to catch exceptions so that they don't bubble up to our
// callers. However, ThreadAbortExceptions will continue to bubble up.
try {
CurrentThread = Thread.CurrentThread;
ISyncContextLock syncContextLock = null;
try {
syncContextLock = (_syncContext != null) ? _syncContext.Enter() : null;
try {
action();
}
catch (Exception ex) {
Error = ExceptionDispatchInfo.Capture(ex);
}
}
finally {
if (syncContextLock != null) {
syncContextLock.Leave();
}
}
}
finally {
CurrentThread = null;
ChangeOperationCount(-1);
}
}
// This method does not run the func by itself. It simply queues the func into the existing
// syncContext queue.
private async Task SafeWrapCallbackAsync(Func<object, Task> func, object state) {
try {
TaskCompletionSource<Task> tcs = new TaskCompletionSource<Task>();
QueueAsynchronous(() => {
var t = func(state);
t.ContinueWith((_) => {
if (t.IsFaulted) {
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled) {
tcs.TrySetCanceled();
}
else {
tcs.TrySetResult(t);
}
}, TaskContinuationOptions.ExecuteSynchronously);
});
await tcs.Task;
}
catch (Exception ex) {
Error = ExceptionDispatchInfo.Capture(ex);
}
finally {
ChangeOperationCount(-1);
}
}
// Sets the continuation that will asynchronously execute when the pending operation counter
// hits zero. Returns true if asynchronous execution is expected, false if the operation
// counter is already at zero and the caller should run the continuation inline.
public bool TrySetCompletionContinuation(Action continuation) {
int newOperationCount = ChangeOperationCount(+1); // prevent the operation counter from hitting zero while we're setting the field
bool scheduledAsynchronously = (newOperationCount > 1);
if (scheduledAsynchronously) {
Interlocked.Exchange(ref _completionTask, new Task(continuation));
}
ChangeOperationCount(-1);
return scheduledAsynchronously;
}
}
}
|