File: generatewithrelativetime.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (89 lines) | stat: -rw-r--r-- 2,711 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from typing import Any, Callable, Optional, TypeVar, cast

from reactivex import Observable, abc
from reactivex.disposable import MultipleAssignmentDisposable
from reactivex.scheduler import TimeoutScheduler
from reactivex.typing import Mapper, Predicate, RelativeTime

_TState = TypeVar("_TState")


def generate_with_relative_time_(
    initial_state: _TState,
    condition: Predicate[_TState],
    iterate: Mapper[_TState, _TState],
    time_mapper: Callable[[_TState], RelativeTime],
) -> Observable[_TState]:
    """Generates an observable sequence by iterating a state from an
    initial state until the condition fails.

    Example:
        res = source.generate_with_relative_time(
            0, lambda x: True, lambda x: x + 1, lambda x: 0.5
        )

    Args:
        initial_state: Initial state.
        condition: Condition to terminate generation (upon returning
            false).
        iterate: Iteration step function.
        time_mapper: Time mapper function to control the speed of
            values being produced each iteration, returning relative
            times, i.e. either floats denoting seconds or instances of
            timedelta.

    Returns:
        The generated sequence.
    """

    def subscribe(
        observer: abc.ObserverBase[_TState],
        scheduler: Optional[abc.SchedulerBase] = None,
    ) -> abc.DisposableBase:
        scheduler = scheduler or TimeoutScheduler.singleton()
        mad = MultipleAssignmentDisposable()
        state = initial_state
        has_result = False
        result: _TState = cast(_TState, None)
        first = True
        time: Optional[RelativeTime] = None

        def action(scheduler: abc.SchedulerBase, _: Any) -> None:
            nonlocal state
            nonlocal has_result
            nonlocal result
            nonlocal first
            nonlocal time

            if has_result:
                observer.on_next(result)

            try:
                if first:
                    first = False
                else:
                    state = iterate(state)

                has_result = condition(state)

                if has_result:
                    result = state
                    time = time_mapper(state)

            except Exception as e:  # pylint: disable=broad-except
                observer.on_error(e)
                return

            if has_result:
                assert time
                mad.disposable = scheduler.schedule_relative(time, action)
            else:
                observer.on_completed()

        mad.disposable = scheduler.schedule_relative(0, action)
        return mad

    return Observable(subscribe)


__all__ = ["generate_with_relative_time_"]