File: range.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (70 lines) | stat: -rw-r--r-- 2,151 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
from sys import maxsize
from typing import Iterator, Optional

from reactivex import Observable, abc
from reactivex.disposable import MultipleAssignmentDisposable
from reactivex.scheduler import CurrentThreadScheduler


def range_(
    start: int,
    stop: Optional[int] = None,
    step: Optional[int] = None,
    scheduler: Optional[abc.SchedulerBase] = None,
) -> Observable[int]:
    """Generates an observable sequence of integral numbers within a
    specified range, using the specified scheduler to send out observer
    messages.

    Examples:
        >>> res = range(10)
        >>> res = range(0, 10)
        >>> res = range(0, 10, 1)

    Args:
        start: The value of the first integer in the sequence.
        stop: [Optional] Generate number up to (exclusive) the stop
            value. Default is `sys.maxsize`.
        step: [Optional] The step to be used (default is 1).
        scheduler: The scheduler to schedule the values on.

    Returns:
        An observable sequence that contains a range of sequential
        integral numbers.
    """

    _stop: int = maxsize if stop is None else stop
    _step: int = 1 if step is None else step

    if step is None and stop is None:
        range_t = range(start)
    elif step is None:
        range_t = range(start, _stop)
    else:
        range_t = range(start, _stop, _step)

    def subscribe(
        observer: abc.ObserverBase[int], scheduler_: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        nonlocal range_t

        _scheduler = scheduler or scheduler_ or CurrentThreadScheduler.singleton()
        sd = MultipleAssignmentDisposable()

        def action(
            scheduler: abc.SchedulerBase, iterator: Optional[Iterator[int]]
        ) -> None:
            try:
                assert iterator
                observer.on_next(next(iterator))
                sd.disposable = _scheduler.schedule(action, state=iterator)
            except StopIteration:
                observer.on_completed()

        sd.disposable = _scheduler.schedule(action, iter(range_t))
        return sd

    return Observable(subscribe)


__all__ = ["range_"]