1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
|
from asyncio import Future
from typing import Callable, TypeVar, Union
import reactivex
from reactivex import Observable, abc
_T = TypeVar("_T")
def if_then_(
condition: Callable[[], bool],
then_source: Union[Observable[_T], "Future[_T]"],
else_source: Union[None, Observable[_T], "Future[_T]"] = None,
) -> Observable[_T]:
"""Determines whether an observable collection contains values.
Example:
1 - res = reactivex.if_then(condition, obs1)
2 - res = reactivex.if_then(condition, obs1, obs2)
Args:
condition: The condition which determines if the then_source or
else_source will be run.
then_source: The observable sequence or Promise that
will be run if the condition function returns true.
else_source: [Optional] The observable sequence or
Promise that will be run if the condition function returns
False. If this is not provided, it defaults to
reactivex.empty
Returns:
An observable sequence which is either the then_source or
else_source.
"""
else_source_: Union[Observable[_T], "Future[_T]"] = else_source or reactivex.empty()
then_source = (
reactivex.from_future(then_source)
if isinstance(then_source, Future)
else then_source
)
else_source_ = (
reactivex.from_future(else_source_)
if isinstance(else_source_, Future)
else else_source_
)
def factory(_: abc.SchedulerBase) -> Union[Observable[_T], "Future[_T]"]:
return then_source if condition() else else_source_
return reactivex.defer(factory)
__all__ = ["if_then_"]
|