File: catchscheduler.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (177 lines) | stat: -rw-r--r-- 6,130 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from datetime import datetime
from typing import Callable, Optional, TypeVar, cast

from reactivex import abc, typing
from reactivex.abc.scheduler import SchedulerBase
from reactivex.disposable import Disposable, SingleAssignmentDisposable

from .periodicscheduler import PeriodicScheduler

_TState = TypeVar("_TState")


class CatchScheduler(PeriodicScheduler):
    def __init__(
        self, scheduler: abc.SchedulerBase, handler: Callable[[Exception], bool]
    ) -> None:
        """Wraps a scheduler, passed as constructor argument, adding exception
        handling for scheduled actions. The handler should return True to
        indicate it handled the exception successfully. Falsy return values will
        be taken to indicate that the exception should be escalated (raised by
        this scheduler).

        Args:
            scheduler: The scheduler to be wrapped.
            handler: Callable to handle exceptions raised by wrapped scheduler.
        """

        super().__init__()
        self._scheduler: abc.SchedulerBase = scheduler
        self._handler: Callable[[Exception], bool] = handler
        self._recursive_original: Optional[abc.SchedulerBase] = None
        self._recursive_wrapper: Optional["CatchScheduler"] = None

    @property
    def now(self) -> datetime:
        """Represents a notion of time for this scheduler. Tasks being
        scheduled on a scheduler will adhere to the time denoted by this
        property.

        Returns:
             The scheduler's current time, as a datetime instance.
        """

        return self._scheduler.now

    def schedule(
        self, action: typing.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        action = self._wrap(action)
        return self._scheduler.schedule(action, state=state)

    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.

        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        action = self._wrap(action)
        return self._scheduler.schedule_relative(duetime, action, state=state)

    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: typing.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        action = self._wrap(action)
        return self._scheduler.schedule_absolute(duetime, action, state=state)

    def schedule_periodic(
        self,
        period: typing.RelativeTime,
        action: typing.ScheduledPeriodicAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules a periodic piece of work.

        Args:
            period: Period in seconds or timedelta for running the
                work periodically.
            action: Action to be executed.
            state: [Optional] Initial state passed to the action upon
                the first iteration.

        Returns:
            The disposable object used to cancel the scheduled
            recurring action (best effort).
        """

        schedule_periodic = getattr(self._scheduler, "schedule_periodic")
        if not callable(schedule_periodic):
            raise NotImplementedError

        disp: SingleAssignmentDisposable = SingleAssignmentDisposable()
        failed: bool = False

        def periodic(state: Optional[_TState] = None) -> Optional[_TState]:
            nonlocal failed
            if failed:
                return None
            try:
                return action(state)
            except Exception as ex:
                failed = True
                if not self._handler(ex):
                    raise
                disp.dispose()
                return None

        scheduler = cast(PeriodicScheduler, self._scheduler)
        disp.disposable = scheduler.schedule_periodic(period, periodic, state=state)
        return disp

    def _clone(self, scheduler: abc.SchedulerBase) -> "CatchScheduler":
        return CatchScheduler(scheduler, self._handler)

    def _wrap(
        self, action: typing.ScheduledAction[_TState]
    ) -> typing.ScheduledAction[_TState]:
        parent = self

        def wrapped_action(
            self: abc.SchedulerBase, state: Optional[_TState]
        ) -> Optional[abc.DisposableBase]:
            try:
                return action(parent._get_recursive_wrapper(self), state)
            except Exception as ex:
                if not parent._handler(ex):
                    raise
                return Disposable()

        return wrapped_action

    def _get_recursive_wrapper(self, scheduler: SchedulerBase) -> "CatchScheduler":
        if self._recursive_wrapper is None or self._recursive_original != scheduler:
            self._recursive_original = scheduler
            wrapper = self._clone(scheduler)
            wrapper._recursive_original = scheduler
            wrapper._recursive_wrapper = wrapper
            self._recursive_wrapper = wrapper

        return self._recursive_wrapper