1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Reactive.Concurrency;
using System.Threading;
namespace System.Reactive.Linq
{
public static partial class Observable
{
#region + ObserveOn +
/// <summary>
/// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="scheduler">Scheduler to notify observers on.</param>
/// <returns>The source sequence whose observations happen on the specified scheduler.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
/// <remarks>
/// This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects
/// that require to be run on a scheduler, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, IScheduler)"/>.
/// </remarks>
public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return s_impl.ObserveOn<TSource>(source, scheduler);
}
#if !NO_SYNCCTX
/// <summary>
/// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="context">Synchronization context to notify observers on.</param>
/// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
/// <remarks>
/// This only invokes observer callbacks on a synchronization context. In case the subscription and/or unsubscription actions have side-effects
/// that require to be run on a synchronization context, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
/// </remarks>
public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return s_impl.ObserveOn<TSource>(source, context);
}
#endif
#endregion
#region + SubscribeOn +
/// <summary>
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;
/// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>
/// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>
/// <remarks>
/// This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer
/// callbacks on a scheduler, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.
/// </remarks>
public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)
{
if (source == null)
throw new ArgumentNullException("source");
if (scheduler == null)
throw new ArgumentNullException("scheduler");
return s_impl.SubscribeOn<TSource>(source, scheduler);
}
#if !NO_SYNCCTX
/// <summary>
/// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used;
/// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>
/// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>
/// <remarks>
/// This only performs the side-effects of subscription and unsubscription on the specified synchronization context. In order to invoke observer
/// callbacks on a synchronization context, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.
/// </remarks>
public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)
{
if (source == null)
throw new ArgumentNullException("source");
if (context == null)
throw new ArgumentNullException("context");
return s_impl.SubscribeOn<TSource>(source, context);
}
#endif
#endregion
#region + Synchronize +
/// <summary>
/// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently.
/// This overload is useful to "fix" an observable sequence that exhibits concurrent callbacks on individual observers, which is invalid behavior for the query processor.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
/// <remarks>
/// It's invalid behavior - according to the observer grammar - for a sequence to exhibit concurrent callbacks on a given observer.
/// This operator can be used to "fix" a source that doesn't conform to this rule.
/// </remarks>
public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source)
{
if (source == null)
throw new ArgumentNullException("source");
return s_impl.Synchronize<TSource>(source);
}
/// <summary>
/// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently, using the specified gate object.
/// This overload is useful when writing n-ary query operators, in order to prevent concurrent callbacks from different sources by synchronizing on a common gate object.
/// </summary>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">Source sequence.</param>
/// <param name="gate">Gate object to synchronize each observer call on.</param>
/// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>
/// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>
public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source, object gate)
{
if (source == null)
throw new ArgumentNullException("source");
if (gate == null)
throw new ArgumentNullException("gate");
return s_impl.Synchronize<TSource>(source, gate);
}
#endregion
}
}
|