File: case.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (35 lines) | stat: -rw-r--r-- 893 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from asyncio import Future
from typing import Callable, Mapping, Optional, TypeVar, Union

from reactivex import Observable, abc, defer, empty, from_future

_Key = TypeVar("_Key")
_T = TypeVar("_T")


def case_(
    mapper: Callable[[], _Key],
    sources: Mapping[_Key, Observable[_T]],
    default_source: Optional[Union[Observable[_T], "Future[_T]"]] = None,
) -> Observable[_T]:

    default_source_: Union[Observable[_T], "Future[_T]"] = default_source or empty()

    def factory(_: abc.SchedulerBase) -> Observable[_T]:
        try:
            result: Union[Observable[_T], "Future[_T]"] = sources[mapper()]
        except KeyError:
            result = default_source_

        if isinstance(result, Future):

            result_: Observable[_T] = from_future(result)
        else:
            result_ = result

        return result_

    return defer(factory)


__all__ = ["case_"]