File: common.py

package info (click to toggle)
python-yalexs 9.2.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,120 kB
  • sloc: python: 7,916; makefile: 3; sh: 2
file content (34 lines) | stat: -rw-r--r-- 1,115 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from __future__ import annotations

import asyncio
import time
from asyncio import AbstractEventLoop, TimerHandle
from typing import Union

_MONOTONIC_RESOLUTION = time.get_clock_info("monotonic").resolution
ScheduledType = Union[TimerHandle, tuple[float, TimerHandle]]


def get_scheduled_timer_handles(loop: AbstractEventLoop) -> list[TimerHandle]:
    """Return a list of scheduled TimerHandles."""
    handles: list[ScheduledType] = loop._scheduled  # type: ignore[attr-defined]
    return [
        handle if isinstance(handle, TimerHandle) else handle[1] for handle in handles
    ]


def fire_time_changed() -> None:
    timestamp = time.time()
    loop = asyncio.get_running_loop()
    for task in list(get_scheduled_timer_handles(loop)):
        if not isinstance(task, asyncio.TimerHandle):
            continue
        if task.cancelled():
            continue

        mock_seconds_into_future = timestamp - time.time()
        future_seconds = task.when() - (loop.time() + _MONOTONIC_RESOLUTION)

        if mock_seconds_into_future >= future_seconds:
            task._run()
            task.cancel()