File: worker_context.py

package info (click to toggle)
python-urllib3 2.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,340 kB
  • sloc: python: 26,167; makefile: 122; javascript: 92; sh: 11
file content (38 lines) | stat: -rw-r--r-- 811 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from __future__ import annotations

import asyncio
from typing import Type, Union

from ..typing import Event


class EventWrapper:
    def __init__(self) -> None:
        self._event = asyncio.Event()

    async def clear(self) -> None:
        self._event.clear()

    async def wait(self) -> None:
        await self._event.wait()

    async def set(self) -> None:
        self._event.set()

    def is_set(self) -> bool:
        return self._event.is_set()


class WorkerContext:
    event_class: Type[Event] = EventWrapper

    def __init__(self) -> None:
        self.terminated = self.event_class()

    @staticmethod
    async def sleep(wait: Union[float, int]) -> None:
        return await asyncio.sleep(wait)

    @staticmethod
    def time() -> float:
        return asyncio.get_event_loop().time()