1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
from typing import Any, Iterable, Optional, TypeVar
from reactivex import Observable, abc
from reactivex.disposable import (
CompositeDisposable,
Disposable,
SerialDisposable,
SingleAssignmentDisposable,
)
from reactivex.scheduler import CurrentThreadScheduler
_T = TypeVar("_T")
def concat_with_iterable_(sources: Iterable[Observable[_T]]) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
_scheduler = scheduler_ or CurrentThreadScheduler.singleton()
sources_ = iter(sources)
subscription = SerialDisposable()
cancelable = SerialDisposable()
is_disposed = False
def action(scheduler: abc.SchedulerBase, state: Any = None) -> None:
nonlocal is_disposed
if is_disposed:
return
def on_completed() -> None:
cancelable.disposable = _scheduler.schedule(action)
try:
current = next(sources_)
except StopIteration:
observer.on_completed()
except Exception as ex: # pylint: disable=broad-except
observer.on_error(ex)
else:
d = SingleAssignmentDisposable()
subscription.disposable = d
d.disposable = current.subscribe(
observer.on_next,
observer.on_error,
on_completed,
scheduler=scheduler_,
)
cancelable.disposable = _scheduler.schedule(action)
def dispose() -> None:
nonlocal is_disposed
is_disposed = True
return CompositeDisposable(subscription, cancelable, Disposable(dispose))
return Observable(subscribe)
__all__ = ["concat_with_iterable_"]
|