1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
from typing import Any, List, Optional, TypeVar
from reactivex import Notification, Observable, abc
from reactivex.disposable import CompositeDisposable, Disposable
from reactivex.scheduler import VirtualTimeScheduler
from .recorded import Recorded
from .subscription import Subscription
_T = TypeVar("_T")
class ColdObservable(Observable[_T]):
def __init__(
self, scheduler: VirtualTimeScheduler, messages: List[Recorded[_T]]
) -> None:
super().__init__()
self.scheduler = scheduler
self.messages = messages
self.subscriptions: List[Subscription] = []
def _subscribe_core(
self,
observer: Optional[abc.ObserverBase[_T]] = None,
scheduler: Optional[abc.SchedulerBase] = None,
) -> abc.DisposableBase:
self.subscriptions.append(Subscription(self.scheduler.clock))
index = len(self.subscriptions) - 1
disp = CompositeDisposable()
def get_action(notification: Notification[_T]) -> abc.ScheduledAction[_T]:
def action(
scheduler: abc.SchedulerBase, state: Any = None
) -> abc.DisposableBase:
if observer:
notification.accept(observer)
return Disposable()
return action
for message in self.messages:
notification = message.value
if not isinstance(notification, Notification):
raise ValueError("Must be notification")
# Don't make closures within a loop
action = get_action(notification)
disp.add(self.scheduler.schedule_relative(message.time, action))
def dispose() -> None:
start = self.subscriptions[index].subscribe
end = self.scheduler.to_seconds(self.scheduler.now)
self.subscriptions[index] = Subscription(start, int(end))
disp.dispose()
return Disposable(dispose)
|