File: _delay.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (141 lines) | stat: -rw-r--r-- 4,722 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from datetime import datetime
from typing import Any, Callable, List, Optional, TypeVar

from reactivex import Notification, Observable, abc
from reactivex import operators as ops
from reactivex import typing
from reactivex.disposable import (
    CompositeDisposable,
    MultipleAssignmentDisposable,
    SerialDisposable,
)
from reactivex.internal.constants import DELTA_ZERO
from reactivex.notification import OnError
from reactivex.scheduler import TimeoutScheduler

from ._timestamp import Timestamp

_T = TypeVar("_T")


def observable_delay_timespan(
    source: Observable[_T],
    duetime: typing.RelativeTime,
    scheduler: Optional[abc.SchedulerBase] = None,
) -> Observable[_T]:
    def subscribe(
        observer: abc.ObserverBase[_T], scheduler_: Optional[abc.SchedulerBase] = None
    ):
        nonlocal duetime

        _scheduler = scheduler or scheduler_ or TimeoutScheduler.singleton()

        if isinstance(duetime, datetime):
            duetime_ = _scheduler.to_datetime(duetime) - _scheduler.now
        else:
            duetime_ = _scheduler.to_timedelta(duetime)

        cancelable = SerialDisposable()
        exception: Optional[Exception] = None
        active = [False]
        running = [False]
        queue: List[Timestamp[Notification[_T]]] = []

        def on_next(notification: Timestamp[Notification[_T]]) -> None:
            nonlocal exception
            should_run = False

            with source.lock:
                if isinstance(notification.value, OnError):
                    del queue[:]
                    queue.append(notification)
                    exception = notification.value.exception
                    should_run = not running[0]
                else:
                    queue.append(
                        Timestamp(
                            value=notification.value,
                            timestamp=notification.timestamp + duetime_,
                        )
                    )
                    should_run = not active[0]
                    active[0] = True

            if should_run:
                if exception:
                    observer.on_error(exception)
                else:
                    mad = MultipleAssignmentDisposable()
                    cancelable.disposable = mad

                    def action(scheduler: abc.SchedulerBase, state: Any = None):
                        if exception:
                            return

                        with source.lock:
                            running[0] = True
                            while True:
                                result = None
                                if queue and queue[0].timestamp <= scheduler.now:
                                    result = queue.pop(0).value

                                if result:
                                    result.accept(observer)

                                if not result:
                                    break

                            should_continue = False
                            recurse_duetime: typing.RelativeTime = 0
                            if queue:
                                should_continue = True
                                diff = queue[0].timestamp - scheduler.now
                                recurse_duetime = max(DELTA_ZERO, diff)
                            else:
                                active[0] = False

                            ex = exception
                            running[0] = False

                        if ex:
                            observer.on_error(ex)
                        elif should_continue:
                            mad.disposable = scheduler.schedule_relative(
                                recurse_duetime, action
                            )

                    mad.disposable = _scheduler.schedule_relative(duetime_, action)

        subscription = source.pipe(
            ops.materialize(),
            ops.timestamp(),
        ).subscribe(on_next, scheduler=_scheduler)

        return CompositeDisposable(subscription, cancelable)

    return Observable(subscribe)


def delay_(
    duetime: typing.RelativeTime, scheduler: Optional[abc.SchedulerBase] = None
) -> Callable[[Observable[_T]], Observable[_T]]:
    def delay(source: Observable[_T]) -> Observable[_T]:
        """Time shifts the observable sequence.

        A partially applied delay operator function.

        Examples:
            >>> res = delay(source)

        Args:
            source: The observable sequence to delay.

        Returns:
            A time-shifted observable sequence.
        """
        return observable_delay_timespan(source, duetime, scheduler)

    return delay


__all__ = ["delay_"]