File: QueryLanguage.Joins.cs

package info (click to toggle)
mono-reference-assemblies 3.12.1%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 604,240 kB
  • ctags: 625,505
  • sloc: cs: 3,967,741; xml: 2,793,081; ansic: 418,042; java: 60,435; sh: 14,833; makefile: 11,576; sql: 7,956; perl: 1,467; cpp: 1,446; yacc: 1,203; python: 598; asm: 422; sed: 16; php: 1
file content (84 lines) | stat: -rw-r--r-- 3,097 bytes parent folder | download | duplicates (9)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Joins;

namespace System.Reactive.Linq
{
    internal partial class QueryLanguage
    {
        #region And

        public virtual Pattern<TLeft, TRight> And<TLeft, TRight>(IObservable<TLeft> left, IObservable<TRight> right)
        {
            return new Pattern<TLeft, TRight>(left, right);
        }

        #endregion

        #region Then

        public virtual Plan<TResult> Then<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
        {
            return new Pattern<TSource>(source).Then(selector);
        }

        #endregion

        #region When

        public virtual IObservable<TResult> When<TResult>(params Plan<TResult>[] plans)
        {
            return When((IEnumerable<Plan<TResult>>)plans);
        }

        public virtual IObservable<TResult> When<TResult>(IEnumerable<Plan<TResult>> plans)
        {
            return new AnonymousObservable<TResult>(observer =>
            {
                var externalSubscriptions = new Dictionary<object, IJoinObserver>();
                var gate = new object();
                var activePlans = new List<ActivePlan>();
                var outObserver = Observer.Create<TResult>(observer.OnNext,
                    exception =>
                    {
                        foreach (var po in externalSubscriptions.Values)
                        {
                            po.Dispose();
                        }
                        observer.OnError(exception);
                    },
                    observer.OnCompleted);
                try
                {
                    foreach (var plan in plans)
                        activePlans.Add(plan.Activate(externalSubscriptions, outObserver,
                                                      activePlan =>
                                                      {
                                                          activePlans.Remove(activePlan);
                                                          if (activePlans.Count == 0)
                                                              outObserver.OnCompleted();
                                                      }));
                }
                catch (Exception e)
                {
                    //
                    // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
                    //
                    return Throw<TResult>(e).Subscribe/*Unsafe*/(observer);
                }

                var group = new CompositeDisposable(externalSubscriptions.Values.Count);
                foreach (var joinObserver in externalSubscriptions.Values)
                {
                    joinObserver.Subscribe(gate);
                    group.Add(joinObserver);
                }
                return group;
            });
        }

        #endregion
    }
}