1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
|
from asyncio import Future
from typing import Callable, Optional, TypeVar, Union
import reactivex
from reactivex import Observable, abc
from reactivex.disposable import (
CompositeDisposable,
SerialDisposable,
SingleAssignmentDisposable,
)
from reactivex.scheduler import CurrentThreadScheduler
_T = TypeVar("_T")
def on_error_resume_next_(
*sources: Union[
Observable[_T], "Future[_T]", Callable[[Optional[Exception]], Observable[_T]]
]
) -> Observable[_T]:
"""Continues an observable sequence that is terminated normally or
by an exception with the next observable sequence.
Examples:
>>> res = reactivex.on_error_resume_next(xs, ys, zs)
Returns:
An observable sequence that concatenates the source sequences,
even if a sequence terminates exceptionally.
"""
sources_ = iter(sources)
def subscribe(
observer: abc.ObserverBase[_T], scheduler: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
scheduler = scheduler or CurrentThreadScheduler.singleton()
subscription = SerialDisposable()
cancelable = SerialDisposable()
def action(
scheduler: abc.SchedulerBase, state: Optional[Exception] = None
) -> None:
try:
source = next(sources_)
except StopIteration:
observer.on_completed()
return
# Allow source to be a factory method taking an error
source = source(state) if callable(source) else source
current = (
reactivex.from_future(source) if isinstance(source, Future) else source
)
d = SingleAssignmentDisposable()
subscription.disposable = d
def on_resume(state: Optional[Exception] = None) -> None:
scheduler.schedule(action, state)
d.disposable = current.subscribe(
observer.on_next, on_resume, on_resume, scheduler=scheduler
)
cancelable.disposable = scheduler.schedule(action)
return CompositeDisposable(subscription, cancelable)
return Observable(subscribe)
__all__ = ["on_error_resume_next_"]
|