1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
import asyncio
from asyncio import Future
from typing import Callable, Optional, TypeVar, cast
from reactivex import Observable, abc
from reactivex.internal.exceptions import SequenceContainsNoElementsError
_T = TypeVar("_T")
def to_future_(
future_ctor: Optional[Callable[[], "Future[_T]"]] = None,
scheduler: Optional[abc.SchedulerBase] = None,
) -> Callable[[Observable[_T]], "Future[_T]"]:
future_ctor_: Callable[[], "Future[_T]"] = (
future_ctor or asyncio.get_event_loop().create_future
)
future: "Future[_T]" = future_ctor_()
def to_future(source: Observable[_T]) -> "Future[_T]":
"""Converts an existing observable sequence to a Future.
If the observable emits a single item, then this item is set as the
result of the future. If the observable emits a sequence of items, then
the last emitted item is set as the result of the future.
Example:
future = reactivex.return_value(42).pipe(ops.to_future(asyncio.Future))
Args:
future_ctor: [Optional] The constructor of the future.
Returns:
A future with the last value from the observable sequence.
"""
has_value = False
last_value = cast(_T, None)
def on_next(value: _T):
nonlocal last_value
nonlocal has_value
last_value = value
has_value = True
def on_error(err: Exception):
if not future.cancelled():
future.set_exception(err)
def on_completed():
nonlocal last_value
if not future.cancelled():
if has_value:
future.set_result(last_value)
else:
future.set_exception(SequenceContainsNoElementsError())
last_value = None
dis = source.subscribe(on_next, on_error, on_completed, scheduler=scheduler)
future.add_done_callback(lambda _: dis.dispose())
return future
return to_future
__all__ = ["to_future_"]
|