1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
|
from datetime import datetime
from typing import Any, Callable, List, Optional, TypeVar
from reactivex import Notification, Observable, abc
from reactivex import operators as ops
from reactivex import typing
from reactivex.disposable import (
CompositeDisposable,
MultipleAssignmentDisposable,
SerialDisposable,
)
from reactivex.internal.constants import DELTA_ZERO
from reactivex.notification import OnError
from reactivex.scheduler import TimeoutScheduler
from ._timestamp import Timestamp
_T = TypeVar("_T")
def observable_delay_timespan(
source: Observable[_T],
duetime: typing.RelativeTime,
scheduler: Optional[abc.SchedulerBase] = None,
) -> Observable[_T]:
def subscribe(
observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
):
nonlocal duetime
_scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()
if isinstance(duetime, datetime):
duetime_ = _scheduler.to_datetime(duetime) - _scheduler.now
else:
duetime_ = _scheduler.to_timedelta(duetime)
cancelable = SerialDisposable()
exception: Optional[Exception] = None
active = [False]
running = [False]
queue: List[Timestamp[Notification[_T]]] = []
def on_next(notification: Timestamp[Notification[_T]]) -> None:
nonlocal exception
should_run = False
with source.lock:
if isinstance(notification.value, OnError):
del queue[:]
queue.append(notification)
exception = notification.value.exception
should_run = not running[0]
else:
queue.append(
Timestamp(
value=notification.value,
timestamp=notification.timestamp + duetime_,
)
)
should_run = not active[0]
active[0] = True
if should_run:
if exception:
observer.on_error(exception)
else:
mad = MultipleAssignmentDisposable()
cancelable.disposable = mad
def action(scheduler: abc.SchedulerBase, state: Any = None):
if exception:
return
with source.lock:
running[0] = True
while True:
result = None
if queue and queue[0].timestamp <= scheduler.now:
result = queue.pop(0).value
if result:
result.accept(observer)
if not result:
break
should_continue = False
recurse_duetime: typing.RelativeTime = 0
if queue:
should_continue = True
diff = queue[0].timestamp - scheduler.now
recurse_duetime = max(DELTA_ZERO, diff)
else:
active[0] = False
ex = exception
running[0] = False
if ex:
observer.on_error(ex)
elif should_continue:
mad.disposable = scheduler.schedule_relative(
recurse_duetime, action
)
mad.disposable = _scheduler.schedule_relative(duetime_, action)
subscription = source.pipe(
ops.materialize(),
ops.timestamp(),
).subscribe(on_next, scheduler=_scheduler)
return CompositeDisposable(subscription, cancelable)
return Observable(subscribe)
def delay_(
duetime: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None
) -> Callable[[Observable[_T]], Observable[_T]]:
def delay(source: Observable[_T]) -> Observable[_T]:
"""Time shifts the observable sequence.
A partially applied delay operator function.
Examples:
>>> res = delay(source)
Args:
source: The observable sequence to delay.
Returns:
A time-shifted observable sequence.
"""
return observable_delay_timespan(source, duetime, scheduler)
return delay
__all__ = ["delay_"]
|