File: returnvalue.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (64 lines) | stat: -rw-r--r-- 1,933 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
from typing import Any, Callable, Optional, TypeVar

from reactivex import Observable, abc
from reactivex.scheduler import CurrentThreadScheduler

_T = TypeVar("_T")


def return_value_(
    value: _T, scheduler: Optional[abc.SchedulerBase] = None
) -> Observable[_T]:
    """Returns an observable sequence that contains a single element,
    using the specified scheduler to send out observer messages.
    There is an alias called 'just'.

    Examples:
        >>> res = return(42)
        >>> res = return(42, rx.Scheduler.timeout)

    Args:
        value: Single element in the resulting observable sequence.

    Returns:
        An observable sequence containing the single specified
        element.
    """

    def subscribe(
        observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        _scheduler = scheduler or scheduler_ or CurrentThreadScheduler.singleton()

        def action(scheduler: abc.SchedulerBase, state: Any = None) -> None:
            observer.on_next(value)
            observer.on_completed()

        return _scheduler.schedule(action)

    return Observable(subscribe)


def from_callable_(
    supplier: Callable[[], _T], scheduler: Optional[abc.SchedulerBase] = None
) -> Observable[_T]:
    def subscribe(
        observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        _scheduler = scheduler or scheduler_ or CurrentThreadScheduler.singleton()

        def action(_: abc.SchedulerBase, __: Any = None) -> None:
            nonlocal observer

            try:
                observer.on_next(supplier())
                observer.on_completed()
            except Exception as e:  # pylint: disable=broad-except
                observer.on_error(e)

        return _scheduler.schedule(action)

    return Observable(subscribe)


__all__ = ["return_value_", "from_callable_"]