1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Disposables;
using System.Threading;
using System.Runtime.CompilerServices;
using System.Reactive.Concurrency;
namespace System.Reactive.Subjects
{
/// <summary>
/// Represents the result of an asynchronous operation.
/// The last value before the OnCompleted notification, or the error received through OnError, is sent to all subscribed observers.
/// </summary>
/// <typeparam name="T">The type of the elements processed by the subject.</typeparam>
public sealed class AsyncSubject<T> : ISubject<T>, IDisposable
#if HAS_AWAIT
, INotifyCompletion
#endif
{
private readonly object _gate = new object();
private ImmutableList<IObserver<T>> _observers;
private bool _isDisposed;
private bool _isStopped;
private T _value;
private bool _hasValue;
private Exception _exception;
/// <summary>
/// Creates a subject that can only receive one value and that value is cached for all future observations.
/// </summary>
public AsyncSubject()
{
_observers = new ImmutableList<IObserver<T>>();
}
/// <summary>
/// Indicates whether the subject has observers subscribed to it.
/// </summary>
public bool HasObservers
{
get
{
var observers = _observers;
return observers != null && observers.Data.Length > 0;
}
}
/// <summary>
/// Notifies all subscribed observers about the end of the sequence, also causing the last received value to be sent out (if any).
/// </summary>
public void OnCompleted()
{
var os = default(IObserver<T>[]);
var v = default(T);
var hv = false;
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
os = _observers.Data;
_observers = new ImmutableList<IObserver<T>>();
_isStopped = true;
v = _value;
hv = _hasValue;
}
}
if (os != null)
{
if (hv)
{
foreach (var o in os)
{
o.OnNext(v);
o.OnCompleted();
}
}
else
foreach (var o in os)
o.OnCompleted();
}
}
/// <summary>
/// Notifies all subscribed observers about the exception.
/// </summary>
/// <param name="error">The exception to send to all observers.</param>
/// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException("error");
var os = default(IObserver<T>[]);
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
os = _observers.Data;
_observers = new ImmutableList<IObserver<T>>();
_isStopped = true;
_exception = error;
}
}
if (os != null)
foreach (var o in os)
o.OnError(error);
}
/// <summary>
/// Sends a value to the subject. The last value received before successful termination will be sent to all subscribed and future observers.
/// </summary>
/// <param name="value">The value to store in the subject.</param>
public void OnNext(T value)
{
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_value = value;
_hasValue = true;
}
}
}
/// <summary>
/// Subscribes an observer to the subject.
/// </summary>
/// <param name="observer">Observer to subscribe to the subject.</param>
/// <returns>Disposable object that can be used to unsubscribe the observer from the subject.</returns>
/// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException("observer");
var ex = default(Exception);
var v = default(T);
var hv = false;
lock (_gate)
{
CheckDisposed();
if (!_isStopped)
{
_observers = _observers.Add(observer);
return new Subscription(this, observer);
}
ex = _exception;
hv = _hasValue;
v = _value;
}
if (ex != null)
observer.OnError(ex);
else if (hv)
{
observer.OnNext(v);
observer.OnCompleted();
}
else
observer.OnCompleted();
return Disposable.Empty;
}
class Subscription : IDisposable
{
private readonly AsyncSubject<T> _subject;
private IObserver<T> _observer;
public Subscription(AsyncSubject<T> subject, IObserver<T> observer)
{
_subject = subject;
_observer = observer;
}
public void Dispose()
{
if (_observer != null)
{
lock (_subject._gate)
{
if (!_subject._isDisposed && _observer != null)
{
_subject._observers = _subject._observers.Remove(_observer);
_observer = null;
}
}
}
}
}
void CheckDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(string.Empty);
}
/// <summary>
/// Unsubscribe all observers and release resources.
/// </summary>
public void Dispose()
{
lock (_gate)
{
_isDisposed = true;
_observers = null;
_exception = null;
_value = default(T);
}
}
#if HAS_AWAIT
/// <summary>
/// Gets an awaitable object for the current AsyncSubject.
/// </summary>
/// <returns>Object that can be awaited.</returns>
public AsyncSubject<T> GetAwaiter()
{
return this;
}
/// <summary>
/// Specifies a callback action that will be invoked when the subject completes.
/// </summary>
/// <param name="continuation">Callback action that will be invoked when the subject completes.</param>
/// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>
public void OnCompleted(Action continuation)
{
if (continuation == null)
throw new ArgumentNullException("continuation");
OnCompleted(continuation, true);
}
#endif
private void OnCompleted(Action continuation, bool originalContext)
{
//
// [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe.
//
this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
}
class AwaitObserver : IObserver<T>
{
#if HAS_AWAIT
private readonly SynchronizationContext _context;
#endif
private readonly Action _callback;
public AwaitObserver(Action callback, bool originalContext)
{
#if HAS_AWAIT
if (originalContext)
_context = SynchronizationContext.Current;
#else
System.Diagnostics.Debug.Assert(!originalContext);
#endif
_callback = callback;
}
public void OnCompleted()
{
InvokeOnOriginalContext();
}
public void OnError(Exception error)
{
InvokeOnOriginalContext();
}
public void OnNext(T value)
{
}
private void InvokeOnOriginalContext()
{
#if HAS_AWAIT
if (_context != null)
{
//
// No need for OperationStarted and OperationCompleted calls here;
// this code is invoked through await support and will have a way
// to observe its start/complete behavior, either through returned
// Task objects or the async method builder's interaction with the
// SynchronizationContext object.
//
_context.Post(c => ((Action)c)(), _callback);
}
else
#endif
{
_callback();
}
}
}
/// <summary>
/// Gets whether the AsyncSubject has completed.
/// </summary>
public bool IsCompleted
{
get
{
return _isStopped;
}
}
/// <summary>
/// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
/// </summary>
/// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns>
/// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]
public T GetResult()
{
if (!_isStopped)
{
var e = new ManualResetEvent(false);
OnCompleted(() => e.Set(), false);
e.WaitOne();
}
_exception.ThrowIfNotNull();
if (!_hasValue)
throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
return _value;
}
}
}
|