1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
|
from typing import Optional, TypeVar
from reactivex.disposable import SingleAssignmentDisposable
from reactivex.internal import default_error, noop
from .. import abc, typing
_T_in = TypeVar("_T_in", contravariant=True)
class AutoDetachObserver(abc.ObserverBase[_T_in]):
def __init__(
self,
on_next: Optional[typing.OnNext[_T_in]] = None,
on_error: Optional[typing.OnError] = None,
on_completed: Optional[typing.OnCompleted] = None,
) -> None:
self._on_next = on_next or noop
self._on_error = on_error or default_error
self._on_completed = on_completed or noop
self._subscription = SingleAssignmentDisposable()
self.is_stopped = False
def on_next(self, value: _T_in) -> None:
if self.is_stopped:
return
self._on_next(value)
def on_error(self, error: Exception) -> None:
if self.is_stopped:
return
self.is_stopped = True
try:
self._on_error(error)
finally:
self.dispose()
def on_completed(self) -> None:
if self.is_stopped:
return
self.is_stopped = True
try:
self._on_completed()
finally:
self.dispose()
def set_disposable(self, value: abc.DisposableBase) -> None:
self._subscription.disposable = value
subscription = property(fset=set_disposable)
def dispose(self) -> None:
self.is_stopped = True
self._subscription.dispose()
def fail(self, exn: Exception) -> bool:
if self.is_stopped:
return False
self.is_stopped = True
self._on_error(exn)
return True
|