1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
import threading
from typing import Any, List, TypeVar
from reactivex import abc, typing
from reactivex.disposable import SerialDisposable
from .observer import Observer
_T_in = TypeVar("_T_in", contravariant=True)
class ScheduledObserver(Observer[_T_in]):
def __init__(
self, scheduler: abc.SchedulerBase, observer: abc.ObserverBase[_T_in]
) -> None:
super().__init__()
self.scheduler = scheduler
self.observer = observer
self.lock = threading.RLock()
self.is_acquired = False
self.has_faulted = False
self.queue: List[typing.Action] = []
self.disposable = SerialDisposable()
# Note to self: list append is thread safe
# http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm
def _on_next_core(self, value: Any) -> None:
def action() -> None:
self.observer.on_next(value)
self.queue.append(action)
def _on_error_core(self, error: Exception) -> None:
def action() -> None:
self.observer.on_error(error)
self.queue.append(action)
def _on_completed_core(self) -> None:
def action() -> None:
self.observer.on_completed()
self.queue.append(action)
def ensure_active(self) -> None:
is_owner = False
with self.lock:
if not self.has_faulted and self.queue:
is_owner = not self.is_acquired
self.is_acquired = True
if is_owner:
self.disposable.disposable = self.scheduler.schedule(self.run)
def run(self, scheduler: abc.SchedulerBase, state: Any) -> None:
parent = self
with self.lock:
if parent.queue:
work = parent.queue.pop(0)
else:
parent.is_acquired = False
return
try:
work()
except Exception:
with self.lock:
parent.queue = []
parent.has_faulted = True
raise
self.scheduler.schedule(self.run)
def dispose(self) -> None:
super().dispose()
self.disposable.dispose()
|