1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
from asyncio import Future
from typing import Callable, Mapping, Optional, TypeVar, Union
from reactivex import Observable, abc, defer, empty, from_future
_Key = TypeVar("_Key")
_T = TypeVar("_T")
def case_(
mapper: Callable[[], _Key],
sources: Mapping[_Key, Observable[_T]],
default_source: Optional[Union[Observable[_T], "Future[_T]"]] = None,
) -> Observable[_T]:
default_source_: Union[Observable[_T], "Future[_T]"] = default_source or empty()
def factory(_: abc.SchedulerBase) -> Observable[_T]:
try:
result: Union[Observable[_T], "Future[_T]"] = sources[mapper()]
except KeyError:
result = default_source_
if isinstance(result, Future):
result_: Observable[_T] = from_future(result)
else:
result_ = result
return result_
return defer(factory)
__all__ = ["case_"]
|