1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
|
from asyncio import Future
from typing import Any, Callable, Optional, TypeVar, Union
from reactivex import Observable, abc, from_future
from reactivex.disposable import (
CompositeDisposable,
SerialDisposable,
SingleAssignmentDisposable,
)
_T = TypeVar("_T")
def switch_latest_() -> Callable[
[Observable[Union[Observable[_T], "Future[_T]"]]], Observable[_T]
]:
def switch_latest(
source: Observable[Union[Observable[_T], "Future[_T]"]]
) -> Observable[_T]:
"""Partially applied switch_latest operator.
Transforms an observable sequence of observable sequences into
an observable sequence producing values only from the most
recent observable sequence.
Returns:
An observable sequence that at any point in time produces
the elements of the most recent inner observable sequence
that has been received.
"""
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
inner_subscription = SerialDisposable()
has_latest = [False]
is_stopped = [False]
latest = [0]
def on_next(inner_source: Union[Observable[_T], "Future[_T]"]) -> None:
nonlocal source
d = SingleAssignmentDisposable()
with source.lock:
latest[0] += 1
_id = latest[0]
has_latest[0] = True
inner_subscription.disposable = d
# Check if Future or Observable
if isinstance(inner_source, Future):
obs = from_future(inner_source)
else:
obs = inner_source
def on_next(x: Any) -> None:
if latest[0] == _id:
observer.on_next(x)
def on_error(e: Exception) -> None:
if latest[0] == _id:
observer.on_error(e)
def on_completed() -> None:
if latest[0] == _id:
has_latest[0] = False
if is_stopped[0]:
observer.on_completed()
d.disposable = obs.subscribe(
on_next, on_error, on_completed, scheduler=scheduler
)
def on_completed() -> None:
is_stopped[0] = True
if not has_latest[0]:
observer.on_completed()
subscription = source.subscribe(
on_next, observer.on_error, on_completed, scheduler=scheduler
)
return CompositeDisposable(subscription, inner_subscription)
return Observable(subscribe)
return switch_latest
__all__ = ["switch_latest_"]
|