File: _join.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (144 lines) | stat: -rw-r--r-- 4,592 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from collections import OrderedDict
from typing import Any, Callable, Optional, Tuple, TypeVar

from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal import noop
from reactivex.operators import take

_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")


def join_(
    right: Observable[_T2],
    left_duration_mapper: Callable[[Any], Observable[Any]],
    right_duration_mapper: Callable[[Any], Observable[Any]],
) -> Callable[[Observable[_T1]], Observable[Tuple[_T1, _T2]]]:
    def join(source: Observable[_T1]) -> Observable[Tuple[_T1, _T2]]:
        """Correlates the elements of two sequences based on
        overlapping durations.

        Args:
            source: Source observable.

        Return:
            An observable sequence that contains elements
            combined into a tuple from source elements that have an overlapping
            duration.
        """

        left = source

        def subscribe(
            observer: abc.ObserverBase[Tuple[_T1, _T2]],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            group = CompositeDisposable()
            left_done = False
            left_map: OrderedDict[int, _T1] = OrderedDict()
            left_id = 0
            right_done = False
            right_map: OrderedDict[int, _T2] = OrderedDict()
            right_id = 0

            def on_next_left(value: _T1):
                nonlocal left_id
                duration = None
                current_id = left_id
                left_id += 1
                md = SingleAssignmentDisposable()

                left_map[current_id] = value
                group.add(md)

                def expire():
                    if current_id in left_map:
                        del left_map[current_id]
                    if not len(left_map) and left_done:
                        observer.on_completed()

                    group.remove(md)

                try:
                    duration = left_duration_mapper(value)
                except Exception as exception:
                    observer.on_error(exception)
                    return

                md.disposable = duration.pipe(take(1)).subscribe(
                    noop, observer.on_error, lambda: expire(), scheduler=scheduler
                )

                for val in right_map.values():
                    result = (value, val)
                    observer.on_next(result)

            def on_completed_left() -> None:
                nonlocal left_done
                left_done = True
                if right_done or not len(left_map):
                    observer.on_completed()

            group.add(
                left.subscribe(
                    on_next_left,
                    observer.on_error,
                    on_completed_left,
                    scheduler=scheduler,
                )
            )

            def on_next_right(value: _T2):
                nonlocal right_id
                duration = None
                current_id = right_id
                right_id += 1
                md = SingleAssignmentDisposable()
                right_map[current_id] = value
                group.add(md)

                def expire():
                    if current_id in right_map:
                        del right_map[current_id]
                    if not len(right_map) and right_done:
                        observer.on_completed()

                    group.remove(md)

                try:
                    duration = right_duration_mapper(value)
                except Exception as exception:
                    observer.on_error(exception)
                    return

                md.disposable = duration.pipe(take(1)).subscribe(
                    noop, observer.on_error, lambda: expire(), scheduler=scheduler
                )

                for val in left_map.values():
                    result = (val, value)
                    observer.on_next(result)

            def on_completed_right():
                nonlocal right_done
                right_done = True
                if left_done or not len(right_map):
                    observer.on_completed()

            group.add(
                right.subscribe(
                    on_next_right,
                    observer.on_error,
                    on_completed_right,
                    scheduler=scheduler,
                )
            )
            return group

        return Observable(subscribe)

    return join


__all__ = ["join_"]