File: behaviorsubject.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (69 lines) | stat: -rw-r--r-- 1,997 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
from typing import Optional, TypeVar, cast

from .. import abc
from ..disposable import Disposable
from .innersubscription import InnerSubscription
from .subject import Subject

_T = TypeVar("_T")


class BehaviorSubject(Subject[_T]):
    """Represents a value that changes over time. Observers can
    subscribe to the subject to receive the last (or initial) value and
    all subsequent notifications.
    """

    def __init__(self, value: _T) -> None:
        """Initializes a new instance of the BehaviorSubject class which
        creates a subject that caches its last value and starts with the
        specified value.

        Args:
            value: Initial value sent to observers when no other value has been
                received by the subject yet.
        """

        super().__init__()

        self.value: _T = value

    def _subscribe_core(
        self,
        observer: abc.ObserverBase[_T],
        scheduler: Optional[abc.SchedulerBase] = None,
    ) -> abc.DisposableBase:
        with self.lock:
            self.check_disposed()
            if not self.is_stopped:
                self.observers.append(observer)
                observer.on_next(self.value)
                return InnerSubscription(self, observer)
            ex = self.exception

        if ex:
            observer.on_error(ex)
        else:
            observer.on_completed()

        return Disposable()

    def _on_next_core(self, value: _T) -> None:
        """Notifies all subscribed observers with the value."""
        with self.lock:
            observers = self.observers.copy()
            self.value = value

        for observer in observers:
            observer.on_next(value)

    def dispose(self) -> None:
        """Release all resources.

        Releases all resources used by the current instance of the
        BehaviorSubject class and unsubscribe all observers.
        """

        with self.lock:
            self.value = cast(_T, None)
            super().dispose()