1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267
|
from __future__ import annotations
import asyncio
import logging
import random
import sys
import warnings
from collections.abc import Callable, Coroutine
from typing import Any, TypeVar
import tornado
__all__ = ["logging_names", "PeriodicCallback", "to_thread", "randbytes"]
logging_names: dict[str | int, int | str] = {}
logging_names.update(logging._levelToName) # type: ignore
logging_names.update(logging._nameToLevel) # type: ignore
LINUX = sys.platform == "linux"
MACOS = sys.platform == "darwin"
WINDOWS = sys.platform == "win32"
def to_thread(*args, **kwargs):
warnings.warn(
"to_thread is deprecated and will be removed in a future release; use asyncio.to_thread instead.",
DeprecationWarning,
stacklevel=2,
)
return asyncio.to_thread(*args, **kwargs)
def randbytes(*args, **kwargs):
warnings.warn(
"randbytes is deprecated and will be removed in a future release; use random.randbytes instead.",
DeprecationWarning,
stacklevel=2,
)
return random.randbytes(*args, **kwargs)
if tornado.version_info >= (6, 2, 0, 0):
from tornado.ioloop import PeriodicCallback
else:
# Backport from https://github.com/tornadoweb/tornado/blob/a4f08a31a348445094d1efa17880ed5472db9f7d/tornado/ioloop.py#L838-L962
# License https://github.com/tornadoweb/tornado/blob/v6.2.0/LICENSE
# Includes minor modifications to source code to pass linting
# This backport ensures that async callbacks are not overlapping if a run
# takes longer than the interval
import datetime
import math
from collections.abc import Awaitable
from inspect import isawaitable
from tornado.ioloop import IOLoop
from tornado.log import app_log
class PeriodicCallback: # type: ignore[no-redef]
"""Schedules the given callback to be called periodically.
The callback is called every ``callback_time`` milliseconds when
``callback_time`` is a float. Note that the timeout is given in
milliseconds, while most other time-related functions in Tornado use
seconds. ``callback_time`` may alternatively be given as a
`datetime.timedelta` object.
If ``jitter`` is specified, each callback time will be randomly selected
within a window of ``jitter * callback_time`` milliseconds.
Jitter can be used to reduce alignment of events with similar periods.
A jitter of 0.1 means allowing a 10% variation in callback time.
The window is centered on ``callback_time`` so the total number of calls
within a given interval should not be significantly affected by adding
jitter.
If the callback runs for longer than ``callback_time`` milliseconds,
subsequent invocations will be skipped to get back on schedule.
`start` must be called after the `PeriodicCallback` is created.
.. versionchanged:: 5.0
The ``io_loop`` argument (deprecated since version 4.1) has been removed.
.. versionchanged:: 5.1
The ``jitter`` argument is added.
.. versionchanged:: 6.2
If the ``callback`` argument is a coroutine, and a callback runs for
longer than ``callback_time``, subsequent invocations will be skipped.
Previously this was only true for regular functions, not coroutines,
which were "fire-and-forget" for `PeriodicCallback`.
The ``callback_time`` argument now accepts `datetime.timedelta` objects,
in addition to the previous numeric milliseconds.
"""
def __init__(
self,
callback: Callable[[], Awaitable | None],
callback_time: datetime.timedelta | float,
jitter: float = 0,
) -> None:
self.callback = callback
if isinstance(callback_time, datetime.timedelta):
self.callback_time = callback_time / datetime.timedelta(milliseconds=1)
else:
if callback_time <= 0:
raise ValueError(
"Periodic callback must have a positive callback_time"
)
self.callback_time = callback_time
self.jitter = jitter
self._running = False
self._timeout = None # type: object
def start(self) -> None:
"""Starts the timer."""
# Looking up the IOLoop here allows to first instantiate the
# PeriodicCallback in another thread, then start it using
# IOLoop.add_callback().
self.io_loop = IOLoop.current()
self._running = True
self._next_timeout = self.io_loop.time()
self._schedule_next()
def stop(self) -> None:
"""Stops the timer."""
self._running = False
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = None
def is_running(self) -> bool:
"""Returns ``True`` if this `.PeriodicCallback` has been started.
.. versionadded:: 4.1
"""
return self._running
async def _run(self) -> None:
if not self._running:
return
try:
val = self.callback()
if val is not None and isawaitable(val):
await val
except Exception:
app_log.error("Exception in callback %r", self.callback, exc_info=True)
finally:
self._schedule_next()
def _schedule_next(self) -> None:
if self._running:
self._update_next(self.io_loop.time())
self._timeout = self.io_loop.add_timeout(self._next_timeout, self._run)
def _update_next(self, current_time: float) -> None:
callback_time_sec = self.callback_time / 1000.0
if self.jitter:
# apply jitter fraction
callback_time_sec *= 1 + (self.jitter * (random.random() - 0.5))
if self._next_timeout <= current_time:
# The period should be measured from the start of one call
# to the start of the next. If one call takes too long,
# skip cycles to get back to a multiple of the original
# schedule.
self._next_timeout += (
math.floor((current_time - self._next_timeout) / callback_time_sec)
+ 1
) * callback_time_sec
else:
# If the clock moved backwards, ensure we advance the next
# timeout instead of recomputing the same value again.
# This may result in long gaps between callbacks if the
# clock jumps backwards by a lot, but the far more common
# scenario is a small NTP adjustment that should just be
# ignored.
#
# Note that on some systems if time.time() runs slower
# than time.monotonic() (most common on windows), we
# effectively experience a small backwards time jump on
# every iteration because PeriodicCallback uses
# time.time() while asyncio schedules callbacks using
# time.monotonic().
# https://github.com/tornadoweb/tornado/issues/2333
self._next_timeout += callback_time_sec
_T = TypeVar("_T")
if sys.version_info >= (3, 12):
asyncio_run = asyncio.run
elif sys.version_info >= (3, 11):
def asyncio_run(
main: Coroutine[Any, Any, _T],
*,
debug: bool = False,
loop_factory: Callable[[], asyncio.AbstractEventLoop] | None = None,
) -> _T:
# asyncio.run from Python 3.12
# https://docs.python.org/3/license.html#psf-license
with asyncio.Runner(debug=debug, loop_factory=loop_factory) as runner:
return runner.run(main)
else:
# modified version of asyncio.run from Python 3.10 to add loop_factory kwarg
# https://docs.python.org/3/license.html#psf-license
def asyncio_run(
main: Coroutine[Any, Any, _T],
*,
debug: bool = False,
loop_factory: Callable[[], asyncio.AbstractEventLoop] | None = None,
) -> _T:
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop"
)
if not asyncio.iscoroutine(main):
raise ValueError(f"a coroutine was expected, got {main!r}")
if loop_factory is None:
loop = asyncio.new_event_loop()
else:
loop = loop_factory()
try:
if loop_factory is None:
asyncio.set_event_loop(loop)
if debug is not None:
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
finally:
if loop_factory is None:
asyncio.set_event_loop(None)
loop.close()
def _cancel_all_tasks(loop: asyncio.AbstractEventLoop) -> None:
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(asyncio.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler(
{
"message": "unhandled exception during asyncio.run() shutdown",
"exception": task.exception(),
"task": task,
}
)
|