File: async_utils.py

package info (click to toggle)
knot-resolver 6.0.17-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,376 kB
  • sloc: javascript: 42,732; ansic: 40,311; python: 12,580; cpp: 2,121; sh: 1,988; xml: 193; makefile: 181
file content (129 lines) | stat: -rw-r--r-- 4,000 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import os
import pkgutil
import signal
import sys
import time
from asyncio import create_subprocess_exec, create_subprocess_shell
from pathlib import PurePath
from threading import Thread
from typing import Any, Dict, Generic, List, Optional, TypeVar, Union

from knot_resolver.utils.compat.asyncio import to_thread


def unblock_signals():
    if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
        signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())  # type: ignore
    else:
        # the list of signals is not exhaustive, but it should cover all signals we might ever want to block
        signal.pthread_sigmask(
            signal.SIG_UNBLOCK,
            {
                signal.SIGHUP,
                signal.SIGINT,
                signal.SIGTERM,
                signal.SIGUSR1,
                signal.SIGUSR2,
            },
        )


async def call(
    cmd: Union[str, bytes, List[str], List[bytes]], shell: bool = False, discard_output: bool = False
) -> int:
    """
    custom async alternative to subprocess.call()
    """
    kwargs: Dict[str, Any] = {
        "preexec_fn": unblock_signals,
    }
    if discard_output:
        kwargs["stdout"] = asyncio.subprocess.DEVNULL
        kwargs["stderr"] = asyncio.subprocess.DEVNULL

    if shell:
        if isinstance(cmd, list):
            raise RuntimeError("can't use list of arguments with shell=True")
        proc = await create_subprocess_shell(cmd, **kwargs)
    else:
        if not isinstance(cmd, list):
            raise RuntimeError(
                "Please use list of arguments, not a single string. It will prevent ambiguity when parsing"
            )
        proc = await create_subprocess_exec(*cmd, **kwargs)

    return await proc.wait()


async def readfile(path: Union[str, PurePath]) -> str:
    """
    asynchronously read whole file and return its content
    """

    def readfile_sync(path: Union[str, PurePath]) -> str:
        with open(path, "r", encoding="utf8") as f:
            return f.read()

    return await to_thread(readfile_sync, path)


async def writefile(path: Union[str, PurePath], content: str) -> None:
    """
    asynchronously set content of a file to a given string `content`.
    """

    def writefile_sync(path: Union[str, PurePath], content: str) -> int:
        with open(path, "w", encoding="utf8") as f:
            return f.write(content)

    await to_thread(writefile_sync, path, content)


async def wait_for_process_termination(pid: int, sleep_sec: float = 0) -> None:
    """
    will wait for any process (does not have to be a child process) given by its PID to terminate

    sleep_sec configures the granularity, with which we should return
    """

    def wait_sync(pid: int, sleep_sec: float) -> None:
        while True:
            try:
                os.kill(pid, 0)
                if sleep_sec == 0:
                    os.sched_yield()
                else:
                    time.sleep(sleep_sec)
            except ProcessLookupError:
                break

    await to_thread(wait_sync, pid, sleep_sec)


async def read_resource(package: str, filename: str) -> Optional[bytes]:
    return await to_thread(pkgutil.get_data, package, filename)


T = TypeVar("T")


class BlockingEventDispatcher(Thread, Generic[T]):
    def __init__(self, name: str = "blocking_event_dispatcher") -> None:
        super().__init__(name=name, daemon=True)
        # warning: the asyncio queue is not thread safe
        self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
        self._main_event_loop = asyncio.get_event_loop()

    def dispatch_event(self, event: T) -> None:
        """
        Method to dispatch events from the blocking thread
        """

        async def add_to_queue():
            await self._removed_unit_names.put(event)

        self._main_event_loop.call_soon_threadsafe(add_to_queue)

    async def next_event(self) -> T:
        return await self._removed_unit_names.get()