File: WebSocketPipe.cs

package info (click to toggle)
mono 6.12.0.199%2Bdfsg-6
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,296,836 kB
  • sloc: cs: 11,181,803; xml: 2,850,076; ansic: 699,709; cpp: 123,344; perl: 59,361; javascript: 30,841; asm: 21,853; makefile: 20,405; sh: 15,009; python: 4,839; pascal: 925; sql: 859; sed: 16; php: 1
file content (237 lines) | stat: -rw-r--r-- 11,737 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
//------------------------------------------------------------------------------
// <copyright file="WebSocketPipe.cs" company="Microsoft">
//     Copyright (c) Microsoft Corporation.  All rights reserved.
// </copyright>
//------------------------------------------------------------------------------

namespace System.Web.WebSockets {
    using System;
    using System.Diagnostics.CodeAnalysis;
    using System.Net.WebSockets;
    using System.Runtime.InteropServices;
    using System.Security.Permissions;
    using System.Threading.Tasks;
    using System.Web.Util;

    // Used to send and receive messages over a WebSocket connection

    [SecurityPermission(SecurityAction.LinkDemand, UnmanagedCode = true)]
    internal sealed class WebSocketPipe : IWebSocketPipe {

        // Managed representation (bindable as an anonymous delegate) of work that can be called by the thunk
        private delegate void CompletionCallback(int hrError, int cbIO, bool fUtf8Encoded, bool fFinalFragment, bool fClose);

        // Corresponds to the unmanaged PFN_WEBSOCKET_COMPLETION delegate
        private delegate void CompletionCallbackThunk(int hrError, IntPtr pvCompletionContext, int cbIO, bool fUtf8Encoded, bool fFinalFragment, bool fClose);
        private static readonly CompletionCallbackThunk _asyncThunk = AsyncCallbackThunk; // need to root the delegate itself so not collected while unmanaged code executing

        [SuppressMessage("Microsoft.Reliability", "CA2006:UseSafeHandleToEncapsulateNativeResources", Justification = @"This is a function pointer whose lifetime lasts for the entire duration of this AppDomain. We never need to release it.")]
        private static readonly IntPtr _asyncThunkAddress = Marshal.GetFunctionPointerForDelegate(_asyncThunk);

        private readonly IUnmanagedWebSocketContext _context;
        private readonly IPerfCounters _perfCounters;

        internal WebSocketPipe(IUnmanagedWebSocketContext context, IPerfCounters perfCounters) {
            _context = context;
            _perfCounters = perfCounters;
        }

        public Task WriteFragmentAsync(ArraySegment<byte> buffer, bool isUtf8Encoded, bool isFinalFragment) {
            TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();

            // The buffer will be read from asynchronously by unmanaged code, so we require that it remain pinned
            PinnedArraySegment<byte> pinnedBuffer = new PinnedArraySegment<byte>(buffer);

            // Callback will always be called (since it is responsible for cleanup), even if completed synchronously
            CompletionCallback callback = (hrError, cbIO, fUtf8Encoded, fFinalFragment, fClose) => {
                try {
                    ThrowExceptionForHR(hrError);
                    tcs.TrySetResult(null); // regular completion
                }
                catch (Exception ex) {
                    tcs.TrySetException(ex); // exceptional completion
                }
                finally {
                    // Always free the buffer to prevent a memory leak
                    pinnedBuffer.Dispose();
                }
            };
            IntPtr completionContext = GCUtil.RootObject(callback);

            // update perf counter with count of data written to wire
            _perfCounters.IncrementCounter(AppPerfCounter.REQUEST_BYTES_OUT_WEBSOCKETS, pinnedBuffer.Count);

            // Call the underlying implementation; WriteFragment should never throw an exception
            int bytesSent = pinnedBuffer.Count;
            bool completionExpected;
            int hr = _context.WriteFragment(
                    pData: pinnedBuffer.Pointer,
                    pcbSent: ref bytesSent,
                    fAsync: true,
                    fUtf8Encoded: isUtf8Encoded,
                    fFinalFragment: isFinalFragment,
                    pfnCompletion: _asyncThunkAddress,
                    pvCompletionContext: completionContext,
                    pfCompletionExpected: out completionExpected);

            if (!completionExpected) {
                // Completed synchronously or error; the thunk and callback together handle cleanup
                AsyncCallbackThunk(hr, completionContext, bytesSent, isUtf8Encoded, isFinalFragment, fClose: false);
            }

            return tcs.Task;
        }

        public Task WriteCloseFragmentAsync(WebSocketCloseStatus closeStatus, string statusDescription) {
            TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();

            // Callback will always be called (since it is responsible for cleanup), even if completed synchronously
            CompletionCallback callback = (hrError, cbIO, fUtf8Encoded, fFinalFragment, fClose) => {
                try {
                    ThrowExceptionForHR(hrError);
                    tcs.TrySetResult(null); // regular completion
                }
                catch (Exception ex) {
                    tcs.TrySetException(ex); // exceptional completion
                }
            };
            IntPtr completionContext = GCUtil.RootObject(callback);

            // Call the underlying implementation; SendConnectionClose should never throw an exception
            bool completionExpected;
            int hr = _context.SendConnectionClose(
                fAsync: true,
                uStatusCode: (ushort)closeStatus,
                szReason: statusDescription, // don't need to pin string: CLR marshaler handles managed to unmanaged conversion, and IIS makes local copy for duration of async operation
                pfnCompletion: _asyncThunkAddress,
                pvCompletionContext: completionContext,
                pfCompletionExpected: out completionExpected);

            if (!completionExpected) {
                // Completed synchronously or error; the thunk and callback together handle cleanup
                AsyncCallbackThunk(hr, completionContext, cbIO: 0, fUtf8Encoded: true, fFinalFragment: true, fClose: false);
            }

            return tcs.Task;
        }

        public Task<WebSocketReceiveResult> ReadFragmentAsync(ArraySegment<byte> buffer) {
            TaskCompletionSource<WebSocketReceiveResult> tcs = new TaskCompletionSource<WebSocketReceiveResult>();

            // The buffer will be written to asynchronously by unmanaged code, so we require that it remain pinned
            PinnedArraySegment<byte> pinnedBuffer = new PinnedArraySegment<byte>(buffer);

            // Callback will always be called (since it is responsible for cleanup), even if completed synchronously
            CompletionCallback callback = (hrError, cbIO, fUtf8Encoded, fFinalFragment, fClose) => {
                try {
                    ThrowExceptionForHR(hrError);

                    WebSocketCloseStatus? closeStatus = null;
                    string closeStatusDescription = null;
                    WebSocketMessageType messageType = (fUtf8Encoded) ? WebSocketMessageType.Text : WebSocketMessageType.Binary;

                    if (fClose) {
                        // this is a CLOSE frame
                        messageType = WebSocketMessageType.Close;
                        WebSocketCloseStatus statusCode;
                        GetCloseStatus(out statusCode, out closeStatusDescription);
                        closeStatus = statusCode;
                    }
                    else {
                        // this is a data frame, so update perf counter with count of data read from wire
                        _perfCounters.IncrementCounter(AppPerfCounter.REQUEST_BYTES_IN_WEBSOCKETS, cbIO);
                    }

                    tcs.TrySetResult(new WebSocketReceiveResult(
                        count: cbIO,
                        messageType: messageType,
                        endOfMessage: fFinalFragment,
                        closeStatus: closeStatus,
                        closeStatusDescription: closeStatusDescription));
                }
                catch (Exception ex) {
                    tcs.TrySetException(ex); // exceptional completion
                }
                finally {
                    // Always free the buffer to prevent a memory leak
                    pinnedBuffer.Dispose();
                }
            };
            IntPtr completionContext = GCUtil.RootObject(callback);

            // Call the underlying implementation; ReadFragment should never throw an exception
            int bytesRead = pinnedBuffer.Count;
            bool isUtf8Encoded;
            bool isFinalFragment;
            bool isConnectionClose;
            bool completionExpected;
            int hr = _context.ReadFragment(
                pData: pinnedBuffer.Pointer,
                pcbData: ref bytesRead,
                fAsync: true,
                pfUtf8Encoded: out isUtf8Encoded,
                pfFinalFragment: out isFinalFragment,
                pfConnectionClose: out isConnectionClose,
                pfnCompletion: _asyncThunkAddress,
                pvCompletionContext: completionContext,
                pfCompletionExpected: out completionExpected);

            if (!completionExpected) {
                // Completed synchronously or error; the thunk and callback together handle cleanup
                AsyncCallbackThunk(hr, completionContext, bytesRead, isUtf8Encoded, isFinalFragment, isConnectionClose);
            }

            return tcs.Task;
        }

        // Gets the reason (numeric + textual) the client sent a CLOSE frame to the server.
        // Returns false if no reason was given.
        private void GetCloseStatus(out WebSocketCloseStatus closeStatus, out string closeStatusDescription) {
            ushort statusCode;
            IntPtr reasonPtr;
            ushort reasonLength;
            int hr = _context.GetCloseStatus(out statusCode, out reasonPtr, out reasonLength);

            if (hr == HResults.E_NOT_SET) {
                // This HRESULT is special-cased to mean that a status code has not been provided.
                statusCode = 0;
                reasonPtr = IntPtr.Zero;
            }
            else {
                // Any other HRESULTs must be treated as exceptional.
                ThrowExceptionForHR(hr);
            }

            // convert the status code and description string
            closeStatus = (WebSocketCloseStatus)statusCode;
            if (reasonPtr != IntPtr.Zero) {
                unsafe {
                    // return a managed copy of the string (IIS will free the original memory)
                    closeStatusDescription = new String((char*)reasonPtr, 0, reasonLength);
                }
            }
            else {
                closeStatusDescription = null;
            }
        }

        public void CloseTcpConnection() {
            _context.CloseTcpConnection();
        }

        // This thunk dispatches to the appropriate instance continuation when an asynchronous event completes
        private static void AsyncCallbackThunk(int hrError, IntPtr pvCompletionContext, int cbIO, bool fUtf8Encoded, bool fFinalFragment, bool fClose) {
            // Calling UnrootObject also makes the callback and everything it references eligible for garbage collection
            CompletionCallback callback = (CompletionCallback)GCUtil.UnrootObject(pvCompletionContext);
            callback(hrError, cbIO, fUtf8Encoded, fFinalFragment, fClose);
        }

        private static void ThrowExceptionForHR(int hrError) {
            // We should homogenize errors coming from the native layer into a WebSocketException.
            if (hrError < 0) {
                throw new WebSocketException(hrError);
            }
        }

    }
}