File: scheduler.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (172 lines) | stat: -rw-r--r-- 5,400 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
from abc import abstractmethod
from datetime import datetime, timedelta, timezone
from typing import Optional, TypeVar

from reactivex import abc, typing
from reactivex.disposable import Disposable
from reactivex.internal.basic import default_now
from reactivex.internal.constants import UTC_ZERO

_TState = TypeVar("_TState")


class Scheduler(abc.SchedulerBase):
    """Base class for the various scheduler implementations in this package as
    well as the mainloop sub-package. This does not include an implementation
    of schedule_periodic, refer to PeriodicScheduler.
    """

    @property
    def now(self) -> datetime:
        """Represents a notion of time for this scheduler. Tasks being
        scheduled on a scheduler will adhere to the time denoted by this
        property.

        Returns:
             The scheduler's current time, as a datetime instance.
        """

        return default_now()

    @abstractmethod
    def schedule(
        self, action: abc.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Schedules an action to be executed.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        return NotImplemented

    @abstractmethod
    def schedule_relative(
        self,
        duetime: typing.RelativeTime,
        action: abc.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed after duetime.

        Args:
            duetime: Relative time after which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        return NotImplemented

    @abstractmethod
    def schedule_absolute(
        self,
        duetime: typing.AbsoluteTime,
        action: abc.ScheduledAction[_TState],
        state: Optional[_TState] = None,
    ) -> abc.DisposableBase:
        """Schedules an action to be executed at duetime.

        Args:
            duetime: Absolute time at which to execute the action.
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object used to cancel the scheduled action
            (best effort).
        """

        return NotImplemented

    def invoke_action(
        self, action: abc.ScheduledAction[_TState], state: Optional[_TState] = None
    ) -> abc.DisposableBase:
        """Invoke the given given action. This is typically called by instances
        of ScheduledItem.

        Args:
            action: Action to be executed.
            state: [Optional] state to be given to the action function.

        Returns:
            The disposable object returned by the action, if any; or a new
            (no-op) disposable otherwise.
        """

        ret = action(self, state)
        if isinstance(ret, abc.DisposableBase):
            return ret

        return Disposable()

    @classmethod
    def to_seconds(cls, value: typing.AbsoluteOrRelativeTime) -> float:
        """Converts time value to seconds. This method handles both absolute
        (datetime) and relative (timedelta) values. If the argument is already
        a float, it is simply returned unchanged.

        Args:
            value: the time value to convert to seconds.

        Returns:
            The value converted to seconds.
        """

        if isinstance(value, datetime):
            value = value - UTC_ZERO

        if isinstance(value, timedelta):
            value = value.total_seconds()

        return value

    @classmethod
    def to_datetime(cls, value: typing.AbsoluteOrRelativeTime) -> datetime:
        """Converts time value to datetime. This method handles both absolute
        (float) and relative (timedelta) values. If the argument is already
        a datetime, it is simply returned unchanged.

        Args:
            value: the time value to convert to datetime.

        Returns:
            The value converted to datetime.
        """

        if isinstance(value, timedelta):
            value = UTC_ZERO + value
        elif not isinstance(value, datetime):
            value = datetime.fromtimestamp((value), tz=timezone.utc)

        return value

    @classmethod
    def to_timedelta(cls, value: typing.AbsoluteOrRelativeTime) -> timedelta:
        """Converts time value to timedelta. This method handles both absolute
        (datetime) and relative (float) values. If the argument is already
        a timedelta, it is simply returned unchanged. If the argument is an
        absolute time, the result value will be the timedelta since the epoch,
        January 1st, 1970, 00:00:00.

        Args:
            value: the time value to convert to timedelta.

        Returns:
            The value converted to timedelta.
        """

        if isinstance(value, datetime):
            value = value - UTC_ZERO
        elif not isinstance(value, timedelta):
            value = timedelta(seconds=value)

        return value