File: _refcount.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (46 lines) | stat: -rw-r--r-- 1,419 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
from typing import Callable, Optional, TypeVar

from reactivex import ConnectableObservable, Observable, abc
from reactivex.disposable import Disposable

_T = TypeVar("_T")


def ref_count_() -> Callable[[ConnectableObservable[_T]], Observable[_T]]:
    """Returns an observable sequence that stays connected to the
    source as long as there is at least one subscription to the
    observable sequence.
    """

    connectable_subscription: Optional[abc.DisposableBase] = None
    count = 0

    def ref_count(source: ConnectableObservable[_T]) -> Observable[_T]:
        def subscribe(
            observer: abc.ObserverBase[_T],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            nonlocal connectable_subscription, count

            count += 1
            should_connect = count == 1
            subscription = source.subscribe(observer, scheduler=scheduler)
            if should_connect:
                connectable_subscription = source.connect(scheduler)

            def dispose() -> None:
                nonlocal connectable_subscription, count

                subscription.dispose()
                count -= 1
                if not count and connectable_subscription:
                    connectable_subscription.dispose()

            return Disposable(dispose)

        return Observable(subscribe)

    return ref_count


__all__ = ["ref_count_"]