File: _publishvalue.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (32 lines) | stat: -rw-r--r-- 901 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from typing import Callable, Optional, TypeVar, Union

from reactivex import ConnectableObservable, Observable, abc
from reactivex import operators as ops
from reactivex.subject import BehaviorSubject
from reactivex.typing import Mapper

_T1 = TypeVar("_T1")
_T2 = TypeVar("_T2")


def publish_value_(
    initial_value: _T1,
    mapper: Optional[Mapper[Observable[_T1], Observable[_T2]]] = None,
) -> Union[
    Callable[[Observable[_T1]], ConnectableObservable[_T1]],
    Callable[[Observable[_T1]], Observable[_T2]],
]:
    if mapper:

        def subject_factory(
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> BehaviorSubject[_T1]:
            return BehaviorSubject(initial_value)

        return ops.multicast(subject_factory=subject_factory, mapper=mapper)

    subject = BehaviorSubject(initial_value)
    return ops.multicast(subject)


__all__ = ["publish_value_"]