File: withlatestfrom.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (59 lines) | stat: -rw-r--r-- 1,987 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from typing import Any, List, Optional, Tuple

from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal.utils import NotSet


def with_latest_from_(
    parent: Observable[Any], *sources: Observable[Any]
) -> Observable[Tuple[Any, ...]]:
    NO_VALUE = NotSet()

    def subscribe(
        observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        def subscribeall(
            parent: Observable[Any], *children: Observable[Any]
        ) -> List[SingleAssignmentDisposable]:

            values = [NO_VALUE for _ in children]

            def subscribechild(
                i: int, child: Observable[Any]
            ) -> SingleAssignmentDisposable:
                subscription = SingleAssignmentDisposable()

                def on_next(value: Any) -> None:
                    with parent.lock:
                        values[i] = value

                subscription.disposable = child.subscribe(
                    on_next, observer.on_error, scheduler=scheduler
                )
                return subscription

            parent_subscription = SingleAssignmentDisposable()

            def on_next(value: Any) -> None:
                with parent.lock:
                    if NO_VALUE not in values:
                        result = (value,) + tuple(values)
                        observer.on_next(result)

            children_subscription = [
                subscribechild(i, child) for i, child in enumerate(children)
            ]
            disp = parent.subscribe(
                on_next, observer.on_error, observer.on_completed, scheduler=scheduler
            )
            parent_subscription.disposable = disp

            return [parent_subscription] + children_subscription

        return CompositeDisposable(subscribeall(parent, *sources))

    return Observable(subscribe)


__all__ = ["with_latest_from_"]