File: fromfuture.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (49 lines) | stat: -rw-r--r-- 1,405 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio
from asyncio import Future
from typing import Any, Optional, TypeVar, cast

from reactivex import Observable, abc
from reactivex.disposable import Disposable

_T = TypeVar("_T")


def from_future_(future: "Future[_T]") -> Observable[_T]:
    """Converts a Future to an Observable sequence

    Args:
        future -- A Python 3 compatible future.
            https://docs.python.org/3/library/asyncio-task.html#future

    Returns:
        An Observable sequence which wraps the existing future success
        and failure.
    """

    def subscribe(
        observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        def done(future: "Future[_T]") -> None:
            try:
                value: Any = future.result()
            except Exception as ex:
                observer.on_error(ex)
            except asyncio.CancelledError as ex:  # pylint: disable=broad-except
                # asyncio.CancelledError is a BaseException, so need to cast
                observer.on_error(cast(Exception, ex))
            else:
                observer.on_next(value)
                observer.on_completed()

        future.add_done_callback(done)

        def dispose() -> None:
            if future:
                future.cancel()

        return Disposable(dispose)

    return Observable(subscribe)


__all__ = ["from_future_"]