File: autodetachobserver.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (65 lines) | stat: -rw-r--r-- 1,710 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from typing import Optional, TypeVar

from reactivex.disposable import SingleAssignmentDisposable
from reactivex.internal import default_error, noop

from .. import abc, typing

_T_in = TypeVar("_T_in", contravariant=True)


class AutoDetachObserver(abc.ObserverBase[_T_in]):
    def __init__(
        self,
        on_next: Optional[typing.OnNext[_T_in]] = None,
        on_error: Optional[typing.OnError] = None,
        on_completed: Optional[typing.OnCompleted] = None,
    ) -> None:
        self._on_next = on_next or noop
        self._on_error = on_error or default_error
        self._on_completed = on_completed or noop

        self._subscription = SingleAssignmentDisposable()
        self.is_stopped = False

    def on_next(self, value: _T_in) -> None:
        if self.is_stopped:
            return
        self._on_next(value)

    def on_error(self, error: Exception) -> None:
        if self.is_stopped:
            return
        self.is_stopped = True

        try:
            self._on_error(error)
        finally:
            self.dispose()

    def on_completed(self) -> None:
        if self.is_stopped:
            return
        self.is_stopped = True

        try:
            self._on_completed()
        finally:
            self.dispose()

    def set_disposable(self, value: abc.DisposableBase) -> None:
        self._subscription.disposable = value

    subscription = property(fset=set_disposable)

    def dispose(self) -> None:
        self.is_stopped = True
        self._subscription.dispose()

    def fail(self, exn: Exception) -> bool:
        if self.is_stopped:
            return False

        self.is_stopped = True
        self._on_error(exn)
        return True