File: Producer.cs

package info (click to toggle)
mono 6.8.0.105%2Bdfsg-3.3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,284,512 kB
  • sloc: cs: 11,172,132; xml: 2,850,069; ansic: 671,653; cpp: 122,091; perl: 59,366; javascript: 30,841; asm: 22,168; makefile: 20,093; sh: 15,020; python: 4,827; pascal: 925; sql: 859; sed: 16; php: 1
file content (51 lines) | stat: -rw-r--r-- 3,187 bytes parent folder | download | duplicates (11)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#if !NO_PERF
using System.Reactive.Concurrency;
using System.Reactive.Disposables;

namespace System.Reactive
{
    /// <summary>
    /// Base class for implementation of query operators, providing performance benefits over the use of <see cref="System.Reactive.Linq.Observable.Create">Observable.Create</see>.
    /// </summary>
    /// <typeparam name="TSource">Type of the resulting sequence's elements.</typeparam>
    abstract class Producer<TSource> : IObservable<TSource>
    {
        /// <summary>
        /// Publicly visible Subscribe method.
        /// </summary>
        /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
        /// <returns>IDisposable to cancel the subscription. This causes the underlying sink to be notified of unsubscription, causing it to prevent further messages from being sent to the observer.</returns>
        public IDisposable Subscribe(IObserver<TSource> observer)
        {
            if (observer == null)
                throw new ArgumentNullException("observer");

            var sink = new SingleAssignmentDisposable();
            var subscription = new SingleAssignmentDisposable();

            if (CurrentThreadScheduler.Instance.ScheduleRequired)
            {
                CurrentThreadScheduler.Instance.Schedule(this, (_, me) => subscription.Disposable = me.Run(observer, subscription, s => sink.Disposable = s));
            }
            else
            {
                subscription.Disposable = this.Run(observer, subscription, s => sink.Disposable = s);
            }

            return new CompositeDisposable(2) { sink, subscription };
        }

        /// <summary>
        /// Core implementation of the query operator, called upon a new subscription to the producer object.
        /// </summary>
        /// <param name="observer">Observer to send notifications on. The implementation of a producer must ensure the correct message grammar on the observer.</param>
        /// <param name="cancel">The subscription disposable object returned from the Run call, passed in such that it can be forwarded to the sink, allowing it to dispose the subscription upon sending a final message (or prematurely for other reasons).</param>
        /// <param name="setSink">Callback to communicate the sink object to the subscriber, allowing consumers to tunnel a Dispose call into the sink, which can stop the processing.</param>
        /// <returns>Disposable representing all the resources and/or subscriptions the operator uses to process events.</returns>
        /// <remarks>The <paramref name="observer">observer</paramref> passed in to this method is not protected using auto-detach behavior upon an OnError or OnCompleted call. The implementation must ensure proper resource disposal and enforce the message grammar.</remarks>
        protected abstract IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink);
    }
}
#endif