File: _delaywithmapper.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (109 lines) | stat: -rw-r--r-- 3,511 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from typing import Any, Callable, Optional, TypeVar, Union

from reactivex import Observable, abc, typing
from reactivex.disposable import (
    CompositeDisposable,
    SerialDisposable,
    SingleAssignmentDisposable,
)

_T = TypeVar("_T")


def delay_with_mapper_(
    subscription_delay: Union[
        Observable[Any],
        typing.Mapper[Any, Observable[Any]],
        None,
    ] = None,
    delay_duration_mapper: Optional[typing.Mapper[_T, Observable[Any]]] = None,
) -> Callable[[Observable[_T]], Observable[_T]]:
    def delay_with_mapper(source: Observable[_T]) -> Observable[_T]:
        """Time shifts the observable sequence based on a subscription
        delay and a delay mapper function for each element.

        Examples:
            >>> obs = delay_with_selector(source)

        Args:
            subscription_delay: [Optional] Sequence indicating the
                delay for the subscription to the source.
            delay_duration_mapper: [Optional] Selector function to
                retrieve a sequence indicating the delay for each given
                element.

        Returns:
            Time-shifted observable sequence.
        """
        sub_delay: Optional[Observable[Any]] = None
        mapper: Optional[typing.Mapper[Any, Observable[Any]]] = None

        if isinstance(subscription_delay, abc.ObservableBase):
            mapper = delay_duration_mapper
            sub_delay = subscription_delay
        else:
            mapper = subscription_delay

        def subscribe(
            observer: abc.ObserverBase[_T],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            delays = CompositeDisposable()
            at_end = [False]

            def done():
                if at_end[0] and delays.length == 0:
                    observer.on_completed()

            subscription = SerialDisposable()

            def start():
                def on_next(x: _T) -> None:
                    try:
                        assert mapper
                        delay = mapper(x)
                    except Exception as error:  # pylint: disable=broad-except
                        observer.on_error(error)
                        return

                    d = SingleAssignmentDisposable()
                    delays.add(d)

                    def on_next(_: Any) -> None:
                        observer.on_next(x)
                        delays.remove(d)
                        done()

                    def on_completed() -> None:
                        observer.on_next(x)
                        delays.remove(d)
                        done()

                    d.disposable = delay.subscribe(
                        on_next, observer.on_error, on_completed, scheduler=scheduler
                    )

                def on_completed() -> None:
                    at_end[0] = True
                    subscription.dispose()
                    done()

                subscription.disposable = source.subscribe(
                    on_next, observer.on_error, on_completed, scheduler=scheduler
                )

            if not sub_delay:
                start()
            else:
                subscription.disposable = sub_delay.subscribe(
                    lambda _: start(), observer.on_error, start
                )

            return CompositeDisposable(subscription, delays)

        return Observable(subscribe)

    return delay_with_mapper


__all__ = ["delay_with_mapper_"]