File: observable.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (45 lines) | stat: -rw-r--r-- 1,334 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from abc import ABC, abstractmethod
from typing import Callable, Generic, Optional, TypeVar, Union

from .disposable import DisposableBase
from .observer import ObserverBase, OnCompleted, OnError, OnNext
from .scheduler import SchedulerBase

_T_out = TypeVar("_T_out", covariant=True)


class ObservableBase(Generic[_T_out], ABC):
    """Observable abstract base class.

    Represents a push-style collection."""

    __slots__ = ()

    @abstractmethod
    def subscribe(
        self,
        on_next: Optional[Union[OnNext[_T_out], ObserverBase[_T_out]]] = None,
        on_error: Optional[OnError] = None,
        on_completed: Optional[OnCompleted] = None,
        *,
        scheduler: Optional[SchedulerBase] = None,
    ) -> DisposableBase:
        """Subscribe an observer to the observable sequence.

        Args:
            observer: [Optional] The object that is to receive
                notifications.
            scheduler: [Optional] The default scheduler to use for this
                subscription.

        Returns:
            Disposable object representing an observer's subscription
            to the observable sequence.
        """

        raise NotImplementedError


Subscription = Callable[[ObserverBase[_T_out], Optional[SchedulerBase]], DisposableBase]

__all__ = ["ObservableBase", "Subscription"]