File: _groupbyuntil.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (177 lines) | stat: -rw-r--r-- 6,308 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from collections import OrderedDict
from typing import Any, Callable, Optional, TypeVar, cast

from reactivex import GroupedObservable, Observable, abc
from reactivex import operators as ops
from reactivex.disposable import (
    CompositeDisposable,
    RefCountDisposable,
    SingleAssignmentDisposable,
)
from reactivex.internal.basic import identity
from reactivex.subject import Subject
from reactivex.typing import Mapper

_T = TypeVar("_T")
_TKey = TypeVar("_TKey")
_TValue = TypeVar("_TValue")


def group_by_until_(
    key_mapper: Mapper[_T, _TKey],
    element_mapper: Optional[Mapper[_T, _TValue]],
    duration_mapper: Callable[[GroupedObservable[_TKey, _TValue]], Observable[Any]],
    subject_mapper: Optional[Callable[[], Subject[_TValue]]] = None,
) -> Callable[[Observable[_T]], Observable[GroupedObservable[_TKey, _TValue]]]:
    """Groups the elements of an observable sequence according to a
    specified key mapper function. A duration mapper function is used
    to control the lifetime of groups. When a group expires, it receives
    an OnCompleted notification. When a new element with the same key
    value as a reclaimed group occurs, the group will be reborn with a
    new lifetime request.

    Examples:
        >>> group_by_until(lambda x: x.id, None, lambda : reactivex.never())
        >>> group_by_until(
            lambda x: x.id,lambda x: x.name, lambda grp: reactivex.never()
        )
        >>> group_by_until(
            lambda x: x.id,
            lambda x: x.name,
            lambda grp: reactivex.never(),
            lambda: ReplaySubject()
        )

    Args:
        key_mapper: A function to extract the key for each element.
        duration_mapper: A function to signal the expiration of a group.
        subject_mapper: A function that returns a subject used to initiate
            a grouped observable. Default mapper returns a Subject object.

    Returns: a sequence of observable groups, each of which corresponds to
    a unique key value, containing all elements that share that same key
    value. If a group's lifetime expires, a new group with the same key
    value can be created once an element with such a key value is
    encountered.
    """

    element_mapper_ = element_mapper or cast(Mapper[_T, _TValue], identity)

    default_subject_mapper: Callable[[], Subject[_TValue]] = lambda: Subject()
    subject_mapper_ = subject_mapper or default_subject_mapper

    def group_by_until(
        source: Observable[_T],
    ) -> Observable[GroupedObservable[_TKey, _TValue]]:
        def subscribe(
            observer: abc.ObserverBase[GroupedObservable[_TKey, _TValue]],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            writers: OrderedDict[_TKey, Subject[_TValue]] = OrderedDict()
            group_disposable = CompositeDisposable()
            ref_count_disposable = RefCountDisposable(group_disposable)

            def on_next(x: _T) -> None:
                writer = None
                key = None

                try:
                    key = key_mapper(x)
                except Exception as e:
                    for wrt in writers.values():
                        wrt.on_error(e)

                    observer.on_error(e)
                    return

                fire_new_map_entry = False
                writer = writers.get(key)
                if not writer:
                    try:
                        writer = subject_mapper_()
                    except Exception as e:
                        for wrt in writers.values():
                            wrt.on_error(e)

                        observer.on_error(e)
                        return

                    writers[key] = writer
                    fire_new_map_entry = True

                if fire_new_map_entry:
                    group: GroupedObservable[_TKey, _TValue] = GroupedObservable(
                        key, writer, ref_count_disposable
                    )
                    duration_group: GroupedObservable[_TKey, Any] = GroupedObservable(
                        key, writer
                    )
                    try:
                        duration = duration_mapper(duration_group)
                    except Exception as e:
                        for wrt in writers.values():
                            wrt.on_error(e)

                        observer.on_error(e)
                        return

                    observer.on_next(group)
                    sad = SingleAssignmentDisposable()
                    group_disposable.add(sad)

                    def expire() -> None:
                        if writers[key]:
                            del writers[key]
                            writer.on_completed()

                        group_disposable.remove(sad)

                    def on_next(value: Any) -> None:
                        pass

                    def on_error(exn: Exception) -> None:
                        for wrt in writers.values():
                            wrt.on_error(exn)
                        observer.on_error(exn)

                    def on_completed() -> None:
                        expire()

                    sad.disposable = duration.pipe(
                        ops.take(1),
                    ).subscribe(on_next, on_error, on_completed, scheduler=scheduler)

                try:
                    element = element_mapper_(x)
                except Exception as error:
                    for wrt in writers.values():
                        wrt.on_error(error)

                    observer.on_error(error)
                    return

                writer.on_next(element)

            def on_error(ex: Exception) -> None:
                for wrt in writers.values():
                    wrt.on_error(ex)

                observer.on_error(ex)

            def on_completed() -> None:
                for wrt in writers.values():
                    wrt.on_completed()

                observer.on_completed()

            group_disposable.add(
                source.subscribe(on_next, on_error, on_completed, scheduler=scheduler)
            )
            return ref_count_disposable

        return Observable(subscribe)

    return group_by_until


__all__ = ["group_by_until_"]