File: groupedobservable.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (40 lines) | stat: -rw-r--r-- 1,284 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from typing import Generic, Optional, TypeVar

from reactivex import abc
from reactivex.disposable import CompositeDisposable, Disposable, RefCountDisposable

from .observable import Observable

_T = TypeVar("_T")
_TKey = TypeVar("_TKey")


class GroupedObservable(Generic[_TKey, _T], Observable[_T]):
    def __init__(
        self,
        key: _TKey,
        underlying_observable: Observable[_T],
        merged_disposable: Optional[RefCountDisposable] = None,
    ):
        super().__init__()
        self.key = key

        def subscribe(
            observer: abc.ObserverBase[_T],
            scheduler: Optional[abc.SchedulerBase] = None,
        ) -> abc.DisposableBase:
            return CompositeDisposable(
                merged_disposable.disposable if merged_disposable else Disposable(),
                underlying_observable.subscribe(observer, scheduler=scheduler),
            )

        self.underlying_observable = (
            underlying_observable if not merged_disposable else Observable(subscribe)
        )

    def _subscribe_core(
        self,
        observer: abc.ObserverBase[_T],
        scheduler: Optional[abc.SchedulerBase] = None,
    ) -> abc.DisposableBase:
        return self.underlying_observable.subscribe(observer, scheduler=scheduler)