1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
from typing import Any, Optional, TypeVar, cast
from reactivex import Observable, abc, typing
from reactivex.disposable import MultipleAssignmentDisposable
from reactivex.scheduler import CurrentThreadScheduler
_TState = TypeVar("_TState")
def generate_(
initial_state: _TState,
condition: typing.Predicate[_TState],
iterate: typing.Mapper[_TState, _TState],
) -> Observable[_TState]:
def subscribe(
observer: abc.ObserverBase[_TState],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
scheduler = scheduler or CurrentThreadScheduler.singleton()
first = True
state = initial_state
mad = MultipleAssignmentDisposable()
def action(scheduler: abc.SchedulerBase, state1: Any = None) -> None:
nonlocal first
nonlocal state
has_result = False
result: _TState = cast(_TState, None)
try:
if first:
first = False
else:
state = iterate(state)
has_result = condition(state)
if has_result:
result = state
except Exception as exception: # pylint: disable=broad-except
observer.on_error(exception)
return
if has_result:
observer.on_next(result)
mad.disposable = scheduler.schedule(action)
else:
observer.on_completed()
mad.disposable = scheduler.schedule(action)
return mad
return Observable(subscribe)
__all__ = ["generate_"]
|