File: MultipleReceiveBinder.cs

package info (click to toggle)
mono 4.6.2.7%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 778,148 kB
  • ctags: 914,052
  • sloc: cs: 5,779,509; xml: 2,773,713; ansic: 432,645; sh: 14,749; makefile: 12,361; perl: 2,488; python: 1,434; cpp: 849; asm: 531; sql: 95; sed: 16; php: 1
file content (386 lines) | stat: -rw-r--r-- 13,863 bytes parent folder | download | duplicates (9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
//-----------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation.  All rights reserved.
//-----------------------------------------------------------------------------

namespace System.ServiceModel.Dispatcher
{

    using System;
    using System.Collections.Generic;
    using System.ServiceModel.Diagnostics;
    using System.Runtime;
    using System.ServiceModel.Channels;
    using System.Threading;

    class MultipleReceiveBinder : IChannelBinder
    {
        internal static class MultipleReceiveDefaults
        {
            internal const int MaxPendingReceives = 1;
        }

        static AsyncCallback onInnerReceiveCompleted = Fx.ThunkCallback(OnInnerReceiveCompleted);

        MultipleReceiveAsyncResult outstanding;
        IChannelBinder channelBinder;
        ReceiveScopeQueue pendingResults;
        bool ordered;

        public MultipleReceiveBinder(IChannelBinder channelBinder, int size, bool ordered)
        {
            this.ordered = ordered;
            this.channelBinder = channelBinder;
            this.pendingResults = new ReceiveScopeQueue(size);
        }

        public IChannel Channel
        {
            get { return this.channelBinder.Channel; }
        }

        public bool HasSession
        {
            get { return this.channelBinder.HasSession; }
        }

        public Uri ListenUri
        {
            get { return this.channelBinder.ListenUri; }
        }

        public EndpointAddress LocalAddress
        {
            get { return this.channelBinder.LocalAddress; }
        }

        public EndpointAddress RemoteAddress
        {
            get { return this.channelBinder.RemoteAddress; }
        }

        public void Abort()
        {
            this.channelBinder.Abort();
        }

        public void CloseAfterFault(TimeSpan timeout)
        {
            this.channelBinder.CloseAfterFault(timeout);
        }

        public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
        {
            return this.channelBinder.TryReceive(timeout, out requestContext);
        }

        public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
        {
            // At anytime there can be only one thread in BeginTryReceive and the 
            // outstanding AsyncResult should have completed before the next one.
            // There should be no pending oustanding result here.
            Fx.AssertAndThrow(this.outstanding == null, "BeginTryReceive should not have a pending result.");

            MultipleReceiveAsyncResult multipleReceiveResult = new MultipleReceiveAsyncResult(callback, state);
            this.outstanding = multipleReceiveResult;
            EnsurePump(timeout);
            IAsyncResult innerResult;
            if (this.pendingResults.TryDequeueHead(out innerResult))
            {
                HandleReceiveRequestComplete(innerResult, true);
            }

            return multipleReceiveResult;
        }

        void EnsurePump(TimeSpan timeout)
        {
            // ensure we're running at full throttle, the BeginTryReceive calls we make below on the
            // IChannelBinder will typically complete future calls to BeginTryReceive made by CannelHandler
            // corollary to that is that most times these calls will be completed sycnhronously
            while (!this.pendingResults.IsFull)
            {
                ReceiveScopeSignalGate receiveScope = new ReceiveScopeSignalGate(this);

                // Enqueue the result without locks since this is the pump. 
                // BeginTryReceive can be called only from one thread and 
                // the head is not yet unlocked so no items can proceed.
                this.pendingResults.Enqueue(receiveScope);
                IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, onInnerReceiveCompleted, receiveScope);
                if (result.CompletedSynchronously)
                {
                    this.SignalReceiveCompleted(result);
                }
            }
        }

        static void OnInnerReceiveCompleted(IAsyncResult nestedResult)
        {
            if (nestedResult.CompletedSynchronously)
            {
                return;
            }

            ReceiveScopeSignalGate thisPtr = nestedResult.AsyncState as ReceiveScopeSignalGate;
            thisPtr.Binder.HandleReceiveAndSignalCompletion(nestedResult, false);
        }

        void HandleReceiveAndSignalCompletion(IAsyncResult nestedResult, bool completedSynchronosly)
        {
            if (SignalReceiveCompleted(nestedResult))
            {
                HandleReceiveRequestComplete(nestedResult, completedSynchronosly);
            }
        }

        private bool SignalReceiveCompleted(IAsyncResult nestedResult)
        {
            if (this.ordered)
            {
                // Ordered recevies can proceed only if its own gate has 
                // been unlocked. Head is the only gate unlocked and only the 
                // result that owns the is the gate at the head can proceed.
                return this.pendingResults.TrySignal((ReceiveScopeSignalGate)nestedResult.AsyncState, nestedResult);
            }
            else
            {
                // Unordered receives can proceed with any gate. If the is head 
                // is not unlocked by BeginTryReceive then the result will 
                // be put on the last pending gate.
                return this.pendingResults.TrySignalPending(nestedResult);
            }
        }

        void HandleReceiveRequestComplete(IAsyncResult innerResult, bool completedSynchronously)
        {
            MultipleReceiveAsyncResult receiveResult = this.outstanding;
            Exception completionException = null;

            try
            {
                Fx.AssertAndThrow(receiveResult != null, "HandleReceive invoked without an outstanding result");
                // Cleanup states
                this.outstanding = null;

                // set the context on the outer result for the ChannelHandler.
                RequestContext context;
                receiveResult.Valid = this.channelBinder.EndTryReceive(innerResult, out context);
                receiveResult.RequestContext = context;
            }
            catch (Exception ex)
            {
                if (Fx.IsFatal(ex))
                {
                    throw;
                }

                completionException = ex;
            }

            receiveResult.Complete(completedSynchronously, completionException);
        }

        public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
        {
            return MultipleReceiveAsyncResult.End(result, out requestContext);
        }

        public RequestContext CreateRequestContext(Message message)
        {
            return this.channelBinder.CreateRequestContext(message);
        }

        public void Send(Message message, TimeSpan timeout)
        {
            this.channelBinder.Send(message, timeout);
        }

        public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginSend(message, timeout, callback, state);
        }

        public void EndSend(IAsyncResult result)
        {
            this.channelBinder.EndSend(result);
        }

        public Message Request(Message message, TimeSpan timeout)
        {
            return this.channelBinder.Request(message, timeout);
        }

        public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginRequest(message, timeout, callback, state);
        }

        public Message EndRequest(IAsyncResult result)
        {
            return this.channelBinder.EndRequest(result);
        }

        public bool WaitForMessage(TimeSpan timeout)
        {
            return this.channelBinder.WaitForMessage(timeout);
        }

        public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
        {
            return this.channelBinder.BeginWaitForMessage(timeout, callback, state);
        }

        public bool EndWaitForMessage(IAsyncResult result)
        {
            return this.channelBinder.EndWaitForMessage(result);
        }

        class MultipleReceiveAsyncResult : AsyncResult
        {
            public MultipleReceiveAsyncResult(AsyncCallback callback, object state)
                : base(callback, state)
            {
            }

            public bool Valid
            {
                get;
                set;
            }

            public RequestContext RequestContext
            {
                get;
                set;
            }

            public new void Complete(bool completedSynchronously, Exception completionException)
            {
                base.Complete(completedSynchronously, completionException);
            }

            public static bool End(IAsyncResult result, out RequestContext context)
            {
                MultipleReceiveAsyncResult thisPtr = AsyncResult.End<MultipleReceiveAsyncResult>(result);
                context = thisPtr.RequestContext;
                return thisPtr.Valid;
            }
        }

        class ReceiveScopeSignalGate : SignalGate<IAsyncResult>
        {
            public ReceiveScopeSignalGate(MultipleReceiveBinder binder)
            {
                this.Binder = binder;
            }

            public MultipleReceiveBinder Binder
            {
                get;
                private set;
            }
        }

        class ReceiveScopeQueue
        {
            // This class is a circular queue with 2 pointers for pending items and head.
            // Ordered Receives : The head is unlocked by BeginTryReceive. The ReceiveGate can signal only the 
            // the gate that it owns. If the gate is the head then it will proceed.
            // Unordered Receives:  Any pending item can be signalled. The pending index keeps track 
            // of results that haven't  been completed. If the head is unlocked then it will proceed.

            int pending;
            int head;
            int count;
            readonly int size;
            ReceiveScopeSignalGate[] items;

            public ReceiveScopeQueue(int size)
            {
                this.size = size;
                this.head = 0;
                this.count = 0;
                this.pending = 0;
                items = new ReceiveScopeSignalGate[size];
            }

            internal bool IsFull
            {
                get { return this.count == this.size; }
            }

            internal void Enqueue(ReceiveScopeSignalGate receiveScope)
            {
                // This should only be called from EnsurePump which itself should only be 
                // BeginTryReceive. This makes sure that we don't need locks to enqueue an item.
                Fx.AssertAndThrow(this.count < this.size, "Cannot Enqueue into a full queue.");
                this.items[(this.head + this.count) % this.size] = receiveScope;
                count++;
            }

            void Dequeue()
            {
                // Dequeue should not be called outside a signal/unlock boundary.
                // There are no locks as this boundary ensures that only one thread 
                // Tries to dequeu an item either in the unlock or Signal thread.
                Fx.AssertAndThrow(this.count > 0, "Cannot Dequeue and empty queue.");
                this.items[head] = null;
                this.head = (head + 1) % this.size;
                this.count--;
            }

            internal bool TryDequeueHead(out IAsyncResult result)
            {
                // Invoked only from BeginTryReceive as only the main thread can 
                // dequeue the head and is  Successful only if it's already been signaled and completed.
                Fx.AssertAndThrow(this.count > 0, "Cannot unlock item when queue is empty");
                if (this.items[head].Unlock(out result))
                {
                    this.Dequeue();
                    return true;
                }

                return false;
            }

            public bool TrySignal(ReceiveScopeSignalGate scope, IAsyncResult nestedResult)
            {
                // Ordered receives can only signal the gate that the AsyncResult owns.
                // If the head has already been unlocked then it can proceed.
                if (scope.Signal(nestedResult))
                {
                    Dequeue();
                    return true;
                }

                return false;
            }

            public bool TrySignalPending(IAsyncResult result)
            {
                // free index will wrap around and always return the next free index;
                // Only the head of the queue can proceed as the head would be unlocked by
                // BeginTryReceive. All other requests will just submit their completed result.
                int nextPending = GetNextPending();
                if (this.items[nextPending].Signal(result))
                {
                    Dequeue();
                    return true;
                }

                return false;
            }

            int GetNextPending()
            {
                int slot = this.pending;
                while (true)
                {
                    if (slot == (slot = Interlocked.CompareExchange(ref this.pending, (slot + 1) % this.size, slot)))
                    {
                        return slot;
                    }
                }
            }
        }
    }
}