File: schedule.py

package info (click to toggle)
python-tempora 5.7.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 240 kB
  • sloc: python: 636; sh: 12; makefile: 7
file content (204 lines) | stat: -rw-r--r-- 5,452 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
Classes for calling functions a schedule. Has time zone support.

For example, to run a job at 08:00 every morning in 'Asia/Calcutta':

>>> import zoneinfo
>>> job = lambda: print("time is now", datetime.datetime())
>>> time = datetime.time(8, tzinfo=zoneinfo.ZoneInfo('Asia/Calcutta'))
>>> cmd = PeriodicCommandFixedDelay.daily_at(time, job)
>>> sched = InvokeScheduler()
>>> sched.add(cmd)
>>> while True:  # doctest: +SKIP
...     sched.run_pending()
...     time.sleep(.1)

By default, the scheduler uses timezone-aware times in UTC. A
client may override the default behavior by overriding ``now``
and ``from_timestamp`` functions.

>>> now()
datetime.datetime(...utc)
>>> from_timestamp(1718723533.7685602)
datetime.datetime(...utc)
"""

import datetime
import numbers
import abc
import bisect

from .utc import now, fromtimestamp as from_timestamp


class DelayedCommand(datetime.datetime):
    """
    A command to be executed after some delay (seconds or timedelta).
    """

    @classmethod
    def from_datetime(cls, other):
        return cls(
            other.year,
            other.month,
            other.day,
            other.hour,
            other.minute,
            other.second,
            other.microsecond,
            other.tzinfo,
        )

    @classmethod
    def after(cls, delay, target):
        if not isinstance(delay, datetime.timedelta):
            delay = datetime.timedelta(seconds=delay)
        due_time = now() + delay
        cmd = cls.from_datetime(due_time)
        cmd.delay = delay
        cmd.target = target
        return cmd

    @staticmethod
    def _from_timestamp(input):
        """
        If input is a real number, interpret it as a Unix timestamp
        (seconds sinc Epoch in UTC) and return a timezone-aware
        datetime object. Otherwise return input unchanged.
        """
        if not isinstance(input, numbers.Real):
            return input
        return from_timestamp(input)

    @classmethod
    def at_time(cls, at, target):
        """
        Construct a DelayedCommand to come due at `at`, where `at` may be
        a datetime or timestamp.
        """
        at = cls._from_timestamp(at)
        cmd = cls.from_datetime(at)
        cmd.delay = at - now()
        cmd.target = target
        return cmd

    def due(self):
        return now() >= self


class PeriodicCommand(DelayedCommand):
    """
    Like a delayed command, but expect this command to run every delay
    seconds.
    """

    def _next_time(self):
        """
        Add delay to self, localized
        """
        return self + self.delay

    def next(self):
        cmd = self.__class__.from_datetime(self._next_time())
        cmd.delay = self.delay
        cmd.target = self.target
        return cmd

    def __setattr__(self, key, value):
        if key == 'delay' and not value > datetime.timedelta():
            raise ValueError("A PeriodicCommand must have a positive, non-zero delay.")
        super().__setattr__(key, value)


class PeriodicCommandFixedDelay(PeriodicCommand):
    """
    Like a periodic command, but don't calculate the delay based on
    the current time. Instead use a fixed delay following the initial
    run.
    """

    @classmethod
    def at_time(cls, at, delay, target):
        """
        >>> cmd = PeriodicCommandFixedDelay.at_time(0, 30, None)
        >>> cmd.delay.total_seconds()
        30.0
        """
        at = cls._from_timestamp(at)
        cmd = cls.from_datetime(at)
        if isinstance(delay, numbers.Number):
            delay = datetime.timedelta(seconds=delay)
        cmd.delay = delay
        cmd.target = target
        return cmd

    @classmethod
    def daily_at(cls, at, target):
        """
        Schedule a command to run at a specific time each day.

        >>> from tempora import utc
        >>> noon = utc.time(12, 0)
        >>> cmd = PeriodicCommandFixedDelay.daily_at(noon, None)
        >>> cmd.delay.total_seconds()
        86400.0
        """
        daily = datetime.timedelta(days=1)
        # convert when to the next datetime matching this time
        when = datetime.datetime.combine(datetime.date.today(), at)
        when -= daily
        while when < now():
            when += daily
        return cls.at_time(when, daily, target)


class Scheduler:
    """
    A rudimentary abstract scheduler accepting DelayedCommands
    and dispatching them on schedule.
    """

    def __init__(self):
        self.queue = []

    def add(self, command):
        assert isinstance(command, DelayedCommand)
        bisect.insort(self.queue, command)

    def run_pending(self):
        while self.queue:
            command = self.queue[0]
            if not command.due():
                break
            self.run(command)
            if isinstance(command, PeriodicCommand):
                self.add(command.next())
            del self.queue[0]

    @abc.abstractmethod
    def run(self, command):
        """
        Run the command
        """


class InvokeScheduler(Scheduler):
    """
    Command targets are functions to be invoked on schedule.
    """

    def run(self, command):
        command.target()


class CallbackScheduler(Scheduler):
    """
    Command targets are passed to a dispatch callable on schedule.
    """

    def __init__(self, dispatch):
        super().__init__()
        self.dispatch = dispatch

    def run(self, command):
        self.dispatch(command.target)