1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
|
from typing import Any, Callable, Optional, TypeVar
from reactivex import Observable, abc
from reactivex.scheduler import CurrentThreadScheduler
_T = TypeVar("_T")
def return_value_(
value: _T, scheduler: Optional[abc.SchedulerBase] = None
) -> Observable[_T]:
"""Returns an observable sequence that contains a single element,
using the specified scheduler to send out observer messages.
There is an alias called 'just'.
Examples:
>>> res = return(42)
>>> res = return(42, rx.Scheduler.timeout)
Args:
value: Single element in the resulting observable sequence.
Returns:
An observable sequence containing the single specified
element.
"""
def subscribe(
observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or CurrentThreadScheduler.singleton()
def action(scheduler: abc.SchedulerBase, state: Any = None) -> None:
observer.on_next(value)
observer.on_completed()
return _scheduler.schedule(action)
return Observable(subscribe)
def from_callable_(
supplier: Callable[[], _T], scheduler: Optional[abc.SchedulerBase] = None
) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or CurrentThreadScheduler.singleton()
def action(_: abc.SchedulerBase, __: Any = None) -> None:
nonlocal observer
try:
observer.on_next(supplier())
observer.on_completed()
except Exception as e: # pylint: disable=broad-except
observer.on_error(e)
return _scheduler.schedule(action)
return Observable(subscribe)
__all__ = ["return_value_", "from_callable_"]
|