File: scheduledobserver.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (81 lines) | stat: -rw-r--r-- 2,189 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import threading
from typing import Any, List, TypeVar

from reactivex import abc, typing
from reactivex.disposable import SerialDisposable

from .observer import Observer

_T_in = TypeVar("_T_in", contravariant=True)


class ScheduledObserver(Observer[_T_in]):
    def __init__(
        self, scheduler: abc.SchedulerBase, observer: abc.ObserverBase[_T_in]
    ) -> None:
        super().__init__()

        self.scheduler = scheduler
        self.observer = observer

        self.lock = threading.RLock()
        self.is_acquired = False
        self.has_faulted = False
        self.queue: List[typing.Action] = []
        self.disposable = SerialDisposable()

        # Note to self: list append is thread safe
        # http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm

    def _on_next_core(self, value: Any) -> None:
        def action() -> None:
            self.observer.on_next(value)

        self.queue.append(action)

    def _on_error_core(self, error: Exception) -> None:
        def action() -> None:
            self.observer.on_error(error)

        self.queue.append(action)

    def _on_completed_core(self) -> None:
        def action() -> None:
            self.observer.on_completed()

        self.queue.append(action)

    def ensure_active(self) -> None:
        is_owner = False

        with self.lock:
            if not self.has_faulted and self.queue:
                is_owner = not self.is_acquired
                self.is_acquired = True

        if is_owner:
            self.disposable.disposable = self.scheduler.schedule(self.run)

    def run(self, scheduler: abc.SchedulerBase, state: Any) -> None:
        parent = self

        with self.lock:
            if parent.queue:
                work = parent.queue.pop(0)
            else:
                parent.is_acquired = False
                return

        try:
            work()
        except Exception:
            with self.lock:
                parent.queue = []
                parent.has_faulted = True
            raise

        self.scheduler.schedule(self.run)

    def dispose(self) -> None:
        super().dispose()
        self.disposable.dispose()