File: defer.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (43 lines) | stat: -rw-r--r-- 1,408 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from asyncio import Future
from typing import Callable, Optional, TypeVar, Union

from reactivex import Observable, abc, from_future, throw
from reactivex.scheduler import ImmediateScheduler

_T = TypeVar("_T")


def defer_(
    factory: Callable[[abc.SchedulerBase], Union[Observable[_T], "Future[_T]"]]
) -> Observable[_T]:
    """Returns an observable sequence that invokes the specified factory
    function whenever a new observer subscribes.

    Example:
        >>> res = defer(lambda scheduler: of(1, 2, 3))

    Args:
        observable_factory: Observable factory function to invoke for
        each observer that subscribes to the resulting sequence. The
        factory takes a single argument, the scheduler used.

    Returns:
        An observable sequence whose observers trigger an invocation
        of the given observable factory function.
    """

    def subscribe(
        observer: abc.ObserverBase[_T], scheduler: Optional[abc.SchedulerBase] = None
    ) -> abc.DisposableBase:
        try:
            result = factory(scheduler or ImmediateScheduler.singleton())
        except Exception as ex:  # By design. pylint: disable=W0703
            return throw(ex).subscribe(observer)

        result = from_future(result) if isinstance(result, Future) else result
        return result.subscribe(observer, scheduler=scheduler)

    return Observable(subscribe)


__all__ = ["defer_"]