File: subject.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (72 lines) | stat: -rw-r--r-- 1,907 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from abc import abstractmethod
from typing import Optional, TypeVar, Union

from .disposable import DisposableBase
from .observable import ObservableBase
from .observer import ObserverBase, OnCompleted, OnError, OnNext
from .scheduler import SchedulerBase

_T = TypeVar("_T")


class SubjectBase(ObserverBase[_T], ObservableBase[_T]):
    """Subject abstract base class.

    Represents an object that is both an observable sequence as well
    as an observer.
    """

    __slots__ = ()

    @abstractmethod
    def subscribe(
        self,
        on_next: Optional[Union[OnNext[_T], ObserverBase[_T]]] = None,
        on_error: Optional[OnError] = None,
        on_completed: Optional[OnCompleted] = None,
        *,
        scheduler: Optional[SchedulerBase] = None,
    ) -> DisposableBase:
        """Subscribe an observer to the observable sequence.

        Args:
            observer: [Optional] The object that is to receive
                notifications.
            scheduler: [Optional] The default scheduler to use for this
                subscription.

        Returns:
            Disposable object representing an observer's subscription
            to the observable sequence.
        """

        raise NotImplementedError

    @abstractmethod
    def on_next(self, value: _T) -> None:
        """Notifies the observer of a new element in the sequence.

        Args:
            value: The received element.
        """

        raise NotImplementedError

    @abstractmethod
    def on_error(self, error: Exception) -> None:
        """Notifies the observer that an exception has occurred.

        Args:
            error: The error that has occurred.
        """

        raise NotImplementedError

    @abstractmethod
    def on_completed(self) -> None:
        """Notifies the observer of the end of the sequence."""

        raise NotImplementedError


__all__ = ["SubjectBase"]