File: replaysubject.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (141 lines) | stat: -rw-r--r-- 4,610 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import sys
from collections import deque
from datetime import datetime, timedelta
from typing import Any, Deque, NamedTuple, Optional, TypeVar, cast

from reactivex.observer.scheduledobserver import ScheduledObserver
from reactivex.scheduler import CurrentThreadScheduler

from .. import abc, typing
from ..observer import Observer
from .subject import Subject

_T = TypeVar("_T")


class RemovableDisposable(abc.DisposableBase):
    def __init__(self, subject: Subject[_T], observer: Observer[_T]):
        self.subject = subject
        self.observer = observer

    def dispose(self) -> None:
        self.observer.dispose()
        if not self.subject.is_disposed and self.observer in self.subject.observers:
            self.subject.observers.remove(self.observer)


class QueueItem(NamedTuple):
    interval: datetime
    value: Any


class ReplaySubject(Subject[_T]):
    """Represents an object that is both an observable sequence as well
    as an observer. Each notification is broadcasted to all subscribed
    and future observers, subject to buffer trimming policies.
    """

    def __init__(
        self,
        buffer_size: Optional[int] = None,
        window: Optional[typing.RelativeTime] = None,
        scheduler: Optional[abc.SchedulerBase] = None,
    ) -> None:
        """Initializes a new instance of the ReplaySubject class with
        the specified buffer size, window and scheduler.

        Args:
            buffer_size: [Optional] Maximum element count of the replay
                buffer.
            window [Optional]: Maximum time length of the replay buffer.
            scheduler: [Optional] Scheduler the observers are invoked on.
        """

        super().__init__()
        self.buffer_size = sys.maxsize if buffer_size is None else buffer_size
        self.scheduler = scheduler or CurrentThreadScheduler.singleton()
        self.window = (
            timedelta.max if window is None else self.scheduler.to_timedelta(window)
        )
        self.queue: Deque[QueueItem] = deque()

    def _subscribe_core(
        self,
        observer: abc.ObserverBase[_T],
        scheduler: Optional[abc.SchedulerBase] = None,
    ) -> abc.DisposableBase:
        so = ScheduledObserver(self.scheduler, observer)
        subscription = RemovableDisposable(self, so)

        with self.lock:
            self.check_disposed()
            self._trim(self.scheduler.now)
            self.observers.append(so)

            for item in self.queue:
                so.on_next(item.value)

            if self.exception is not None:
                so.on_error(self.exception)
            elif self.is_stopped:
                so.on_completed()

        so.ensure_active()
        return subscription

    def _trim(self, now: datetime) -> None:
        while len(self.queue) > self.buffer_size:
            self.queue.popleft()

        while self.queue and (now - self.queue[0].interval) > self.window:
            self.queue.popleft()

    def _on_next_core(self, value: _T) -> None:
        """Notifies all subscribed observers with the value."""

        with self.lock:
            observers = self.observers.copy()
            now = self.scheduler.now
            self.queue.append(QueueItem(interval=now, value=value))
            self._trim(now)

        for observer in observers:
            observer.on_next(value)

        for observer in observers:
            cast(ScheduledObserver[_T], observer).ensure_active()

    def _on_error_core(self, error: Exception) -> None:
        """Notifies all subscribed observers with the exception."""

        with self.lock:
            observers = self.observers.copy()
            self.observers.clear()
            self.exception = error
            now = self.scheduler.now
            self._trim(now)

        for observer in observers:
            observer.on_error(error)
            cast(ScheduledObserver[_T], observer).ensure_active()

    def _on_completed_core(self) -> None:
        """Notifies all subscribed observers of the end of the sequence."""

        with self.lock:
            observers = self.observers.copy()
            self.observers.clear()
            now = self.scheduler.now
            self._trim(now)

        for observer in observers:
            observer.on_completed()
            cast(ScheduledObserver[_T], observer).ensure_active()

    def dispose(self) -> None:
        """Releases all resources used by the current instance of the
        ReplaySubject class and unsubscribe all observers."""

        with self.lock:
            self.queue.clear()
            super().dispose()