File: SynchronizationHelper.cs

package info (click to toggle)
mono 6.8.0.105%2Bdfsg-3.3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,284,512 kB
  • sloc: cs: 11,172,132; xml: 2,850,069; ansic: 671,653; cpp: 122,091; perl: 59,366; javascript: 30,841; asm: 22,168; makefile: 20,093; sh: 15,020; python: 4,827; pascal: 925; sql: 859; sed: 16; php: 1
file content (221 lines) | stat: -rw-r--r-- 10,885 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
//------------------------------------------------------------------------------
// <copyright file="SynchronizationHelper.cs" company="Microsoft">
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
//------------------------------------------------------------------------------

namespace System.Web.Util {
    using System;
    using System.Runtime.ExceptionServices;
    using System.Threading;
    using System.Threading.Tasks;

    // This class is used by the AspNetSynchronizationContext to assist with scheduling tasks in a non-blocking fashion.
    // Asynchronous work will be queued and will execute sequentially, never consuming more than a single thread at a time.
    // Synchronous work will block and will execute on the current thread.

    internal sealed class SynchronizationHelper {

        private Task _completionTask; // the Task that will run when all in-flight operations have completed
        private Thread _currentThread; // the Thread that's running the current Task; all threads must see the same value for this field
        private Task _lastScheduledTask = CreateInitialTask(); // the last Task that was queued to this helper, used to hook future Tasks (not volatile since always accessed under lock)
        private Task _lastScheduledTaskAsync = CreateInitialTask(); // the last async Task that was queued to this helper
        private readonly object _lockObj = new object(); // synchronizes access to _lastScheduledTask
        private int _operationsInFlight; // operation counter
        private readonly ISyncContext _syncContext; // a context that wraps an operation with pre- and post-execution phases
        private readonly Action<bool> _appVerifierCallback; // for making sure that developers don't try calling us after the request has completed

        public SynchronizationHelper(ISyncContext syncContext) {
            _syncContext = syncContext;
            _appVerifierCallback = AppVerifier.GetSyncContextCheckDelegate(syncContext);
        }

        // If an operation results in an exception, this property will provide access to it.
        public ExceptionDispatchInfo Error { get; set; }

        // Helper to access the _currentThread field in a thread-safe fashion.
        // It is not enough to mark the _currentThread field volatile, since that only guarantees
        // read / write ordering and doesn't ensure that each thread sees the same value.
        private Thread CurrentThread {
            get { return Interlocked.CompareExchange(ref _currentThread, null, null); }
            set { Interlocked.Exchange(ref _currentThread, value); }
        }

        // Returns the number of pending operations
        public int PendingCount { get { return ChangeOperationCount(0); } }

        public int ChangeOperationCount(int addend) {
            int newOperationCount = Interlocked.Add(ref _operationsInFlight, addend);
            if (newOperationCount == 0) {
                // if an asynchronous completion operation is queued, run it
                Task completionTask = Interlocked.Exchange(ref _completionTask, null);
                if (completionTask != null) {
                    completionTask.Start();
                }
            }

            return newOperationCount;
        }

        private void CheckForRequestStateIfRequired(bool checkForReEntry) {
            if (_appVerifierCallback != null) {
                _appVerifierCallback(checkForReEntry);
            }
        }

        // Creates the initial hook that future operations can ride off of
        private static Task CreateInitialTask() {
            return Task.FromResult<object>(null);
        }

        // Takes control of this SynchronizationHelper instance synchronously. Asynchronous operations
        // will be queued but will not be dispatched until control is released (by disposing of the
        // returned object). This operation might block if a different thread is currently in
        // control of the context.
        public IDisposable EnterSynchronousControl() {
            if (CurrentThread == Thread.CurrentThread) {
                // If the current thread already has control of this context, there's nothing extra to do.
                return DisposableAction.Empty;
            }

            // used to mark the end of the synchronous task
            TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
            Task lastTask;
            lock (_lockObj) {
                lastTask = _lastScheduledTask;
                _lastScheduledTask = tcs.Task; // future work can be scheduled off this Task
            }

            // The original task may end up Faulted, which would make its Wait() method throw an exception.
            // To avoid this, we instead wait on a continuation which is always guaranteed to complete successfully.
            if (!lastTask.IsCompleted) { lastTask.ContinueWith(_ => { }, TaskContinuationOptions.ExecuteSynchronously).Wait(); }
            CurrentThread = Thread.CurrentThread;

            // synchronous control is released by marking the Task as complete
            return new DisposableAction(() => {
                CurrentThread = null;
                tcs.TrySetResult(null);
            });
        }

        public void QueueAsynchronous(Action action) {
            CheckForRequestStateIfRequired(checkForReEntry: true);
            ChangeOperationCount(+1);

            // This method only schedules work; it doesn't itself do any work. The lock is held for a very
            // short period of time.
            lock (_lockObj) {
                Task newTask = _lastScheduledTask.ContinueWith(_ => SafeWrapCallback(action), TaskScheduler.Default);
                _lastScheduledTask = newTask; // the newly-created task is now the last one
            }
        }

        // QueueAsynchronousAsync and SafeWrapCallbackAsync guarantee:
        // 1. For funcs posted here, it's would first come, first complete.
        // 2. There is no overlapping execution.
        public void QueueAsynchronousAsync(Func<object, Task> func, object state) {
            CheckForRequestStateIfRequired(checkForReEntry: true);
            ChangeOperationCount(+1);

            // This method only schedules work; it doesn't itself do any work. The lock is held for a very
            // short period of time.
            lock (_lockObj) {
                // 1. Note that we are chaining newTask with _lastScheduledTaskAsync, not _lastScheduledTask.
                // Chaining newTask with _lastScheduledTask would cause deadlock.
                // 2. Unwrap() is necessary to be called here. When chaining multiple tasks using the ContinueWith
                // method, your return type will be Task<T> whereas T is the return type of the delegate/method
                // passed to ContinueWith. As the return type of an async delegate is a Task, you will end up with 
                // a Task<Task> and end up waiting for the async delegate to return you the Task which is done after
                // the first await.
                Task newTask = _lastScheduledTaskAsync.ContinueWith(
                    async _ => { await SafeWrapCallbackAsync(func, state); }).Unwrap();
                _lastScheduledTaskAsync = newTask; // the newly-created task is now the last one
            }
        }

        public void QueueSynchronous(Action action) {
            CheckForRequestStateIfRequired(checkForReEntry: false);
            if (CurrentThread == Thread.CurrentThread) {
                // current thread already owns the context, so just execute inline to prevent deadlocks
                action();
                return;
            }

            ChangeOperationCount(+1);
            using (EnterSynchronousControl()) {
                SafeWrapCallback(action);
            }
        }

        private void SafeWrapCallback(Action action) {
            // This method will try to catch exceptions so that they don't bubble up to our
            // callers. However, ThreadAbortExceptions will continue to bubble up.
            try {
                CurrentThread = Thread.CurrentThread;
                ISyncContextLock syncContextLock = null;
                try {
                    syncContextLock = (_syncContext != null) ? _syncContext.Enter() : null;
                    try {
                        action();
                    }
                    catch (Exception ex) {
                        Error = ExceptionDispatchInfo.Capture(ex);
                    }
                }
                finally {
                    if (syncContextLock != null) {
                        syncContextLock.Leave();
                    }
                }
            }
            finally {
                CurrentThread = null;
                ChangeOperationCount(-1);
            }
        }

        // This method does not run the func by itself. It simply queues the func into the existing
        // syncContext queue.
        private async Task SafeWrapCallbackAsync(Func<object, Task> func, object state) {
            try {
                TaskCompletionSource<Task> tcs = new TaskCompletionSource<Task>();
                QueueAsynchronous(() => {
                    var t = func(state);
                    t.ContinueWith((_) => {
                        if (t.IsFaulted) {
                            tcs.TrySetException(t.Exception.InnerExceptions);
                        }
                        else if (t.IsCanceled) {
                            tcs.TrySetCanceled();
                        }
                        else {
                            tcs.TrySetResult(t);
                        }
                    }, TaskContinuationOptions.ExecuteSynchronously);
                });
                await tcs.Task;
            }
            catch (Exception ex) {
                Error = ExceptionDispatchInfo.Capture(ex);
            }
            finally {
                ChangeOperationCount(-1);
            }
        }

        // Sets the continuation that will asynchronously execute when the pending operation counter
        // hits zero. Returns true if asynchronous execution is expected, false if the operation
        // counter is already at zero and the caller should run the continuation inline.
        public bool TrySetCompletionContinuation(Action continuation) {
            int newOperationCount = ChangeOperationCount(+1); // prevent the operation counter from hitting zero while we're setting the field
            bool scheduledAsynchronously = (newOperationCount > 1);
            if (scheduledAsynchronously) {
                Interlocked.Exchange(ref _completionTask, new Task(continuation));
            }

            ChangeOperationCount(-1);
            return scheduledAsynchronously;
        }

    }
}