1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
from typing import Callable, Optional, TypeVar
from reactivex import ConnectableObservable, Observable, abc
from reactivex.disposable import Disposable
_T = TypeVar("_T")
def ref_count_() -> Callable[[ConnectableObservable[_T]], Observable[_T]]:
"""Returns an observable sequence that stays connected to the
source as long as there is at least one subscription to the
observable sequence.
"""
connectable_subscription: Optional[abc.DisposableBase] = None
count = 0
def ref_count(source: ConnectableObservable[_T]) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T],
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
nonlocal connectable_subscription, count
count += 1
should_connect = count == 1
subscription = source.subscribe(observer, scheduler=scheduler)
if should_connect:
connectable_subscription = source.connect(scheduler)
def dispose() -> None:
nonlocal connectable_subscription, count
subscription.dispose()
count -= 1
if not count and connectable_subscription:
connectable_subscription.dispose()
return Disposable(dispose)
return Observable(subscribe)
return ref_count
__all__ = ["ref_count_"]
|