1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
from typing import Any, List, Optional, Tuple
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable
from reactivex.internal.utils import NotSet
def with_latest_from_(
parent: Observable[Any], *sources: Observable[Any]
) -> Observable[Tuple[Any, ...]]:
NO_VALUE = NotSet()
def subscribe(
observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
def subscribeall(
parent: Observable[Any], *children: Observable[Any]
) -> List[SingleAssignmentDisposable]:
values = [NO_VALUE for _ in children]
def subscribechild(
i: int, child: Observable[Any]
) -> SingleAssignmentDisposable:
subscription = SingleAssignmentDisposable()
def on_next(value: Any) -> None:
with parent.lock:
values[i] = value
subscription.disposable = child.subscribe(
on_next, observer.on_error, scheduler=scheduler
)
return subscription
parent_subscription = SingleAssignmentDisposable()
def on_next(value: Any) -> None:
with parent.lock:
if NO_VALUE not in values:
result = (value,) + tuple(values)
observer.on_next(result)
children_subscription = [
subscribechild(i, child) for i, child in enumerate(children)
]
disp = parent.subscribe(
on_next, observer.on_error, observer.on_completed, scheduler=scheduler
)
parent_subscription.disposable = disp
return [parent_subscription] + children_subscription
return CompositeDisposable(subscribeall(parent, *sources))
return Observable(subscribe)
__all__ = ["with_latest_from_"]
|