1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
|
from typing import Callable, Optional, TypeVar, Union
from reactivex import ConnectableObservable, Observable, abc
from reactivex import operators as ops
from reactivex.disposable import CompositeDisposable
_TSource = TypeVar("_TSource")
_TResult = TypeVar("_TResult")
def multicast_(
subject: Optional[abc.SubjectBase[_TSource]] = None,
*,
subject_factory: Optional[
Callable[[Optional[abc.SchedulerBase]], abc.SubjectBase[_TSource]]
] = None,
mapper: Optional[Callable[[Observable[_TSource]], Observable[_TResult]]] = None,
) -> Callable[
[Observable[_TSource]], Union[Observable[_TResult], ConnectableObservable[_TSource]]
]:
"""Multicasts the source sequence notifications through an
instantiated subject into all uses of the sequence within a mapper
function. Each subscription to the resulting sequence causes a
separate multicast invocation, exposing the sequence resulting from
the mapper function's invocation. For specializations with fixed
subject types, see Publish, PublishLast, and Replay.
Examples:
>>> res = multicast(observable)
>>> res = multicast(
subject_factory=lambda scheduler: Subject(),
mapper=lambda x: x
)
Args:
subject_factory: Factory function to create an intermediate
subject through which the source sequence's elements will be
multicast to the mapper function.
subject: Subject to push source elements into.
mapper: [Optional] Mapper function which can use the
multicasted source sequence subject to the policies enforced
by the created subject. Specified only if subject_factory"
is a factory function.
Returns:
An observable sequence that contains the elements of a sequence
produced by multicasting the source sequence within a mapper
function.
"""
def multicast(
source: Observable[_TSource],
) -> Union[Observable[_TResult], ConnectableObservable[_TSource]]:
if subject_factory:
def subscribe(
observer: abc.ObserverBase[_TResult],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
assert subject_factory
connectable = source.pipe(
ops.multicast(subject=subject_factory(scheduler))
)
assert mapper
subscription = mapper(connectable).subscribe(
observer, scheduler=scheduler
)
return CompositeDisposable(subscription, connectable.connect(scheduler))
return Observable(subscribe)
if not subject:
raise ValueError("multicast: Subject cannot be None")
ret: ConnectableObservable[_TSource] = ConnectableObservable(source, subject)
return ret
return multicast
|