File: _sequenceequal.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (124 lines) | stat: -rw-r--r-- 4,148 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from typing import Callable, Iterable, List, Optional, TypeVar, Union

import reactivex
from reactivex import Observable, abc, typing
from reactivex.disposable import CompositeDisposable
from reactivex.internal import default_comparer

_T = TypeVar("_T")


def sequence_equal_(
    second: Union[Observable[_T], Iterable[_T]],
    comparer: Optional[typing.Comparer[_T]] = None,
) -> Callable[[Observable[_T]], Observable[bool]]:
    comparer_ = comparer or default_comparer
    second_ = (
        reactivex.from_iterable(second) if isinstance(second, Iterable) else second
    )

    def sequence_equal(source: Observable[_T]) -> Observable[bool]:
        """Determines whether two sequences are equal by comparing the
        elements pairwise using a specified equality comparer.

        Examples:
            >>> res = sequence_equal([1,2,3])
            >>> res = sequence_equal([{ "value": 42 }], lambda x, y: x.value == y.value)
            >>> res = sequence_equal(reactivex.return_value(42))
            >>> res = sequence_equal(
                reactivex.return_value({ "value": 42 }),
                lambda x, y: x.value == y.value
            )

        Args:
            source: Source observable to compare.

        Returns:
            An observable sequence that contains a single element which
        indicates whether both sequences are of equal length and their
        corresponding elements are equal according to the specified
        equality comparer.
        """
        first = source

        def subscribe(
            observer: abc.ObserverBase[bool],
            scheduler: Optional[abc.SchedulerBase] = None,
        ):
            donel = [False]
            doner = [False]
            ql: List[_T] = []
            qr: List[_T] = []

            def on_next1(x: _T) -> None:
                if len(qr) > 0:
                    v = qr.pop(0)
                    try:
                        equal = comparer_(v, x)
                    except Exception as e:
                        observer.on_error(e)
                        return

                    if not equal:
                        observer.on_next(False)
                        observer.on_completed()

                elif doner[0]:
                    observer.on_next(False)
                    observer.on_completed()
                else:
                    ql.append(x)

            def on_completed1() -> None:
                donel[0] = True
                if not ql:
                    if qr:
                        observer.on_next(False)
                        observer.on_completed()
                    elif doner[0]:
                        observer.on_next(True)
                        observer.on_completed()

            def on_next2(x: _T):
                if len(ql) > 0:
                    v = ql.pop(0)
                    try:
                        equal = comparer_(v, x)
                    except Exception as exception:
                        observer.on_error(exception)
                        return

                    if not equal:
                        observer.on_next(False)
                        observer.on_completed()

                elif donel[0]:
                    observer.on_next(False)
                    observer.on_completed()
                else:
                    qr.append(x)

            def on_completed2():
                doner[0] = True
                if not qr:
                    if len(ql) > 0:
                        observer.on_next(False)
                        observer.on_completed()
                    elif donel[0]:
                        observer.on_next(True)
                        observer.on_completed()

            subscription1 = first.subscribe(
                on_next1, observer.on_error, on_completed1, scheduler=scheduler
            )
            subscription2 = second_.subscribe(
                on_next2, observer.on_error, on_completed2, scheduler=scheduler
            )
            return CompositeDisposable(subscription1, subscription2)

        return Observable(subscribe)

    return sequence_equal


__all__ = ["sequence_equal_"]