1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
//
// BREAKING CHANGE v2 > v1.x - FromEvent[Pattern] now has an implicit SubscribeOn and Publish operation.
//
// The free-threaded nature of Rx is key to the performance characteristics of the event processing
// pipeline. However, in places where we bridge with the external world, this sometimes has negative
// effects due to thread-affine operations involved. The FromEvent[Pattern] bridges are one such
// place where we reach out to add and remove operations on events.
//
// Consider the following piece of code, assuming Rx v1.x usage:
//
// var txt = Observable.FromEventPattern(txtInput, "TextChanged");
// var res = from term in txt
// from word in svc.Lookup(term).TakeUntil(txt)
// select word;
//
// This code is flawed for various reasons. Seasoned Rx developers will immediately suggest usage of
// the Publish operator to share the side-effects of subscribing to the txt sequence, resulting in
// only one subscription to the event:
//
// var txt = Observable.FromEventPattern(txtInput, "TextChanged");
// var res = txt.Publish(txt_ => from term in txt_
// from word in svc.Lookup(term).TakeUntil(txt_)
// select word);
//
// Customers are typically confused as to why FromEvent[Pattern] causes multiple handlers to be added
// to the underlying event. This is in contrast with other From* bridges which involve the use of a
// subject (e.g. FromAsyncPattern, FromAsync, and ToObservable on Task<T>).
//
// But there are more issues with the code fragment above. Upon completion of the svc.Lookup(term)
// sequence, TakeUntil will unsubscribe from both sequences, causing the unsubscription to happen in
// the context of the source's OnCompleted, which may be the thread pool. Some thread-affine events
// don't quite like this. In UI frameworks like WPF and Silverlight, this turns out to be not much of
// a problem typically, but it's merely an accident things work out. From an e-mail conversion with
// the WPF/SL/Jupiter experts:
//
// "Unfortunately, as I expected, it’s confusing, and implementation details are showing through.
// The bottom line is that event add/remove should always be done on the right thread.
//
// Where events are implemented with compiler-generated code, i.e. MultiCastDelegate, the add/remove
// will be thread safe/agile. Where events are implemented in custom code, across Wpf/SL/WP/Jupiter,
// the add/remove are expected to happen on the Dispatcher thread.
//
// Jupiter actually has the consistent story here, where all the event add/remove implementations do
// the thread check. It should still be a “wrong thread” error, though, not an AV.
//
// In SL there’s a mix of core events (which do the thread check) and framework events (which use
// compiler-generated event implementations). So you get an exception if you unhook Button.Loaded
// from off thread, but you don’t get an exception if you unhook Button.Click.
//
// In WPF there’s a similar mix (some events are compiler-generated and some use the EventHandlerStore).
// But I don’t see any thread safety or thread check in the EventHandlerStore. So while it works, IIUC,
// it should have race conditions and corruptions."
//
// Starting with "Jupiter" (Windows XAML aka "Metro"), checks are added to ensure the add and remove
// operations for UI events are called from the UI thread. As a result, the dictionary suggest sample
// code shown above starts to fail. A possible fix is to use SubscribeOnDispatcher:
//
// var txt = Observable.FromEventPattern(txtInput, "TextChanged").SubscribeOnDispatcher();
// var res = from term in txt
// from word in svc.Lookup(term).TakeUntil(txt)
// select word;
//
// This fix has two problems:
//
// 1. Customers often don't quite understand the difference between ObserveOn and SubscribeOn. In fact,
// we've given guidance that use of the latter is typically indicative of a misunderstanding, and
// is used rarely. Also, the fragment above would likely be extended with some UI binding code where
// one needs to use ObserveOnDispatcher, so the combination of both becomes even more confusing.
//
// 2. There's a subtle race condition now. Upon receiving a new term from the txt sequence, SelectMany's
// invocation of the result selector involves TakeUntil subscribing to txt again. However, the use
// of SubscribeOnDispatcher means the subscription is now happening asynchronously, leaving a time
// gap between returning from Subscribe and doing the += on the underlying event:
//
// (Subscription of TakeUntil to txt)
// |
// v
// txt --------------------------------------------------------------
// |
// +-----...----+ (SubscribeOnDispatcher's post of Subscribe)
// |
// TextChanged ------"re"---------"rea"-------------"reac"-----"react"----...
// ^
// |
// (where += on the event happens)
//
// While this problem is rare and sometimes gets mitigated by accident because code is posting back
// to e.g. the UI message loop, it's extremely hard to debug when things go wrong.
//
// In order to fix this behavior such that code has the expected behavior, we do two things in Rx v2.0:
//
// - To solve the cross-thread add/remove handler operations and make them single-thread affine, we
// now do an implicit SubscribeOn with the SynchronizationContext.Current retrieved eagerly upon
// calling FromEvent[Pattern]. This goes hand-in-hand with a recommendation:
//
// "Always call FromEvent[Pattern] in a place where you'd normally write += and -= operations
// yourself. Don't inline the creation of a FromEvent[Pattern] object inside a query."
//
// This recommendation helps to keep code clean (bridging operations are moved outside queries) and
// ensures the captured SynchronizationContext is the least surprising one. E.g in the sample code
// above, the whole query likely lives in a button_Click handler or so.
//
// - To solve the time gap issue, we now add implicit Publish behavior with ref-counted behavior. In
// other words, the new FromEvent[Pattern] is pretty much the same as:
//
// Observable_v2.FromEvent[Pattern](<args>)
// ==
// Observable_v1.FromEvent[Pattern](<args>).SubscribeOn(SynchronizationContext.Current)
// .Publish()
// .RefCount()
//
// Overloads to FromEvent[Pattern] allow to specify the scheduler used for the SubscribeOn operation
// that's taking place internally. When omitted, a SynchronizationContextScheduler will be supplied
// if a current SynchronizationContext is found. If no current SynchronizationContext is found, the
// default scheduler is the immediate scheduler, falling back to the free-threaded behavior we had
// before in v1.x. (See GetSchedulerForCurrentContext in QueryLanguage.Events.cs).
//
// Notice a time gap can still occur at the point of the first subscription to the event sequence,
// or when the ref count fell back to zero. In cases of nested uses of the sequence (such as in the
// running example here), this is fine because the top-level subscription is kept alive for the whole
// duration. In other cases, there's already a race condition between the underlying event and the
// observable wrapper (assuming events are hot). For cold events that have side-effects upon add and
// remove handler operations, use of Observable.Create is recommended. This should be rather rare,
// as most events follow the typical MulticastDelegate implementation pattern:
//
// public event EventHandler<BarEventArgs> Bar;
//
// protected void OnBar(int value)
// {
// var bar = Bar;
// if (bar != null)
// bar(this, new BarEventArgs(value));
// }
//
// In here, there's already a race between the user hooking up an event handler through the += add
// operation and the event producer (possibly on a different thread) calling OnBar. It's also worth
// pointing out that this race condition is migitated by a check in SynchronizationContextScheduler
// causing synchronous execution in case the caller is already on the target SynchronizationContext.
// This situation is common when using FromEvent[Pattern] immediately after declaring it, e.g. in
// the context of a UI event handler.
//
// Finally, notice we can't simply connect the event to a Subject<T> upon a FromEvent[Pattern] call,
// because this would make it impossible to get rid of this one event handler (unless we expose some
// other means of resource maintenance, e.g. by making the returned object implement IDisposable).
// Also, this would cause the event producer to see the event's delegate in a non-null state all the
// time, causing event argument objects to be newed up, possibly sending those into a zero-observer
// subject (which is opaque to the event producer). Not to mention that the subject would always be
// rooted by the target event (even when the FromEvent[Pattern] observable wrapper is unreachable).
//
namespace System.Reactive.Linq.ObservableImpl
{
class FromEvent<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, TEventArgs>
{
private readonly Func<Action<TEventArgs>, TDelegate> _conversion;
public FromEvent(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
}
public FromEvent(Func<Action<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(addHandler, removeHandler, scheduler)
{
_conversion = conversion;
}
protected override TDelegate GetHandler(Action<TEventArgs> onNext)
{
var handler = default(TDelegate);
if (_conversion == null)
{
handler = ReflectionUtils.CreateDelegate<TDelegate>(onNext, typeof(Action<TEventArgs>).GetMethod("Invoke"));
}
else
{
handler = _conversion(onNext);
}
return handler;
}
}
abstract class EventProducer<TDelegate, TArgs> : Producer<TArgs>
{
private readonly IScheduler _scheduler;
private readonly object _gate;
public EventProducer(IScheduler scheduler)
{
_scheduler = scheduler;
_gate = new object();
}
protected abstract TDelegate GetHandler(Action<TArgs> onNext);
protected abstract IDisposable AddHandler(TDelegate handler);
private Session _session;
protected override IDisposable Run(IObserver<TArgs> observer, IDisposable cancel, Action<IDisposable> setSink)
{
var connection = default(IDisposable);
lock (_gate)
{
//
// A session object holds on to a single handler to the underlying event, feeding
// into a subject. It also ref counts the number of connections to the subject.
//
// When the ref count goes back to zero, the event handler is unregistered, and
// the session will reach out to reset the _session field to null under the _gate
// lock. Future subscriptions will cause a new session to be created.
//
if (_session == null)
_session = new Session(this);
connection = _session.Connect(observer);
}
return connection;
}
class Session
{
private readonly EventProducer<TDelegate, TArgs> _parent;
private readonly Subject<TArgs> _subject;
private SingleAssignmentDisposable _removeHandler;
private int _count;
public Session(EventProducer<TDelegate, TArgs> parent)
{
_parent = parent;
_subject = new Subject<TArgs>();
}
public IDisposable Connect(IObserver<TArgs> observer)
{
/*
* CALLERS - Ensure this is called under the lock!
*
lock (_parent._gate) */
{
//
// We connect the given observer to the subject first, before performing any kind
// of initialization which will register an event handler. This is done to ensure
// we don't have a time gap between adding the handler and connecting the user's
// subject, e.g. when the ImmediateScheduler is used.
//
// [OK] Use of unsafe Subscribe: called on a known subject implementation.
//
var connection = _subject.Subscribe/*Unsafe*/(observer);
if (++_count == 1)
{
try
{
Initialize();
}
catch (Exception exception)
{
--_count;
connection.Dispose();
observer.OnError(exception);
return Disposable.Empty;
}
}
return Disposable.Create(() =>
{
connection.Dispose();
lock (_parent._gate)
{
if (--_count == 0)
{
_parent._scheduler.Schedule(_removeHandler.Dispose);
_parent._session = null;
}
}
});
}
}
private void Initialize()
{
/*
* CALLERS - Ensure this is called under the lock!
*
lock (_parent._gate) */
{
//
// When the ref count goes to zero, no-one should be able to perform operations on
// the session object anymore, because it gets nulled out.
//
Debug.Assert(_removeHandler == null);
_removeHandler = new SingleAssignmentDisposable();
//
// Conversion code is supposed to be a pure function and shouldn't be run on the
// scheduler, but the add handler call should. Notice the scheduler can be the
// ImmediateScheduler, causing synchronous invocation. This is the default when
// no SynchronizationContext is found (see QueryLanguage.Events.cs and search for
// the GetSchedulerForCurrentContext method).
//
var onNext = _parent.GetHandler(_subject.OnNext);
_parent._scheduler.Schedule(onNext, AddHandler);
}
}
private IDisposable AddHandler(IScheduler self, TDelegate onNext)
{
var removeHandler = default(IDisposable);
try
{
removeHandler = _parent.AddHandler(onNext);
}
catch (Exception exception)
{
_subject.OnError(exception);
return Disposable.Empty;
}
//
// We don't propagate the exception to the OnError channel upon Dispose. This is
// not possible at this stage, because we've already auto-detached in the base
// class Producer implementation. Even if we would switch the OnError and auto-
// detach calls, it wouldn't work because the remove handler logic is scheduled
// on the given scheduler, causing asynchrony. We can't block waiting for the
// remove handler to run on the scheduler.
//
_removeHandler.Disposable = removeHandler;
return Disposable.Empty;
}
}
}
abstract class ClassicEventProducer<TDelegate, TArgs> : EventProducer<TDelegate, TArgs>
{
private readonly Action<TDelegate> _addHandler;
private readonly Action<TDelegate> _removeHandler;
public ClassicEventProducer(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
: base(scheduler)
{
_addHandler = addHandler;
_removeHandler = removeHandler;
}
protected override IDisposable AddHandler(TDelegate handler)
{
_addHandler(handler);
return Disposable.Create(() => _removeHandler(handler));
}
}
}
#endif
|