1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
|
from typing import Any, Iterable, Optional, TypeVar
from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable
from reactivex.scheduler import CurrentThreadScheduler
_T = TypeVar("_T")
def from_iterable_(
iterable: Iterable[_T], scheduler: Optional[abc.SchedulerBase] = None
) -> Observable[_T]:
"""Converts an iterable to an observable sequence.
Example:
>>> from_iterable([1,2,3])
Args:
iterable: A Python iterable
scheduler: An optional scheduler to schedule the values on.
Returns:
The observable sequence whose elements are pulled from the
given iterable sequence.
"""
def subscribe(
observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
) -> abc.DisposableBase:
_scheduler = scheduler or scheduler_ or CurrentThreadScheduler.singleton()
iterator = iter(iterable)
disposed = False
def action(_: abc.SchedulerBase, __: Any = None) -> None:
nonlocal disposed
try:
while not disposed:
value = next(iterator)
observer.on_next(value)
except StopIteration:
observer.on_completed()
except Exception as error: # pylint: disable=broad-except
observer.on_error(error)
def dispose() -> None:
nonlocal disposed
disposed = True
disp = Disposable(dispose)
return CompositeDisposable(_scheduler.schedule(action), disp)
return Observable(subscribe)
__all__ = ["from_iterable_"]
|