1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#if !NO_PERF
using System;
using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;
namespace System.Reactive.Linq.ObservableImpl
{
class Next<TSource> : PushToPullAdapter<TSource, TSource>
{
public Next(IObservable<TSource> source)
: base(source)
{
}
protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
{
return new _(subscription);
}
class _ : PushToPullSink<TSource, TSource>
{
private readonly object _gate;
#if !NO_CDS
private readonly SemaphoreSlim _semaphore;
#else
private readonly Semaphore _semaphore;
#endif
public _(IDisposable subscription)
: base(subscription)
{
_gate = new object();
#if !NO_CDS
_semaphore = new SemaphoreSlim(0, 1);
#else
_semaphore = new Semaphore(0, 1);
#endif
}
private bool _waiting;
private NotificationKind _kind;
private TSource _value;
private Exception _error;
public override void OnNext(TSource value)
{
lock (_gate)
{
if (_waiting)
{
_value = value;
_kind = NotificationKind.OnNext;
_semaphore.Release();
}
_waiting = false;
}
}
public override void OnError(Exception error)
{
base.Dispose();
lock (_gate)
{
//
// BREAKING CHANGE v2 > v1.x - Next doesn't block indefinitely when it reaches the end.
//
_error = error;
_kind = NotificationKind.OnError;
if (_waiting)
_semaphore.Release();
_waiting = false;
}
}
public override void OnCompleted()
{
base.Dispose();
lock (_gate)
{
//
// BREAKING CHANGE v2 > v1.x - Next doesn't block indefinitely when it reaches the end.
//
_kind = NotificationKind.OnCompleted;
if (_waiting)
_semaphore.Release();
_waiting = false;
}
}
public override bool TryMoveNext(out TSource current)
{
var done = false;
lock (_gate)
{
_waiting = true;
//
// BREAKING CHANGE v2 > v1.x - Next doesn't block indefinitely when it reaches the end.
//
done = _kind != NotificationKind.OnNext;
}
if (!done)
{
#if !NO_CDS
_semaphore.Wait();
#else
_semaphore.WaitOne();
#endif
}
//
// When we reach this point, we released the lock and got the next notification
// from the observer. We assume no concurrent calls to the TryMoveNext method
// are made (per general guidance on usage of IEnumerable<T>). If the observer
// enters the lock again, it should have quit it first, causing _waiting to be
// set to false, hence future accesses of the lock won't set the _kind, _value,
// and _error fields, until TryMoveNext is entered again and _waiting is reset
// to true. In conclusion, the fields are stable for read below.
//
// Notice we rely on memory barrier acquire/release behavior due to the use of
// the semaphore, not the lock (we're still under the lock when we release the
// semaphore in the On* methods!).
//
switch (_kind)
{
case NotificationKind.OnNext:
current = _value;
return true;
case NotificationKind.OnError:
_error.Throw();
break;
case NotificationKind.OnCompleted:
break;
}
current = default(TSource);
return false;
}
}
}
}
#endif
|