1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
import asyncio
import os
import pkgutil
import signal
import sys
import time
from asyncio import create_subprocess_exec, create_subprocess_shell
from pathlib import PurePath
from threading import Thread
from typing import Any, Dict, Generic, List, Optional, TypeVar, Union
from knot_resolver.utils.compat.asyncio import to_thread
def unblock_signals():
if sys.version_info.major >= 3 and sys.version_info.minor >= 8:
signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) # type: ignore
else:
# the list of signals is not exhaustive, but it should cover all signals we might ever want to block
signal.pthread_sigmask(
signal.SIG_UNBLOCK,
{
signal.SIGHUP,
signal.SIGINT,
signal.SIGTERM,
signal.SIGUSR1,
signal.SIGUSR2,
},
)
async def call(
cmd: Union[str, bytes, List[str], List[bytes]], shell: bool = False, discard_output: bool = False
) -> int:
"""
custom async alternative to subprocess.call()
"""
kwargs: Dict[str, Any] = {
"preexec_fn": unblock_signals,
}
if discard_output:
kwargs["stdout"] = asyncio.subprocess.DEVNULL
kwargs["stderr"] = asyncio.subprocess.DEVNULL
if shell:
if isinstance(cmd, list):
raise RuntimeError("can't use list of arguments with shell=True")
proc = await create_subprocess_shell(cmd, **kwargs)
else:
if not isinstance(cmd, list):
raise RuntimeError(
"Please use list of arguments, not a single string. It will prevent ambiguity when parsing"
)
proc = await create_subprocess_exec(*cmd, **kwargs)
return await proc.wait()
async def readfile(path: Union[str, PurePath]) -> str:
"""
asynchronously read whole file and return its content
"""
def readfile_sync(path: Union[str, PurePath]) -> str:
with open(path, "r", encoding="utf8") as f:
return f.read()
return await to_thread(readfile_sync, path)
async def writefile(path: Union[str, PurePath], content: str) -> None:
"""
asynchronously set content of a file to a given string `content`.
"""
def writefile_sync(path: Union[str, PurePath], content: str) -> int:
with open(path, "w", encoding="utf8") as f:
return f.write(content)
await to_thread(writefile_sync, path, content)
async def wait_for_process_termination(pid: int, sleep_sec: float = 0) -> None:
"""
will wait for any process (does not have to be a child process) given by its PID to terminate
sleep_sec configures the granularity, with which we should return
"""
def wait_sync(pid: int, sleep_sec: float) -> None:
while True:
try:
os.kill(pid, 0)
if sleep_sec == 0:
os.sched_yield()
else:
time.sleep(sleep_sec)
except ProcessLookupError:
break
await to_thread(wait_sync, pid, sleep_sec)
async def read_resource(package: str, filename: str) -> Optional[bytes]:
return await to_thread(pkgutil.get_data, package, filename)
T = TypeVar("T")
class BlockingEventDispatcher(Thread, Generic[T]):
def __init__(self, name: str = "blocking_event_dispatcher") -> None:
super().__init__(name=name, daemon=True)
# warning: the asyncio queue is not thread safe
self._removed_unit_names: "asyncio.Queue[T]" = asyncio.Queue()
self._main_event_loop = asyncio.get_event_loop()
def dispatch_event(self, event: T) -> None:
"""
Method to dispatch events from the blocking thread
"""
async def add_to_queue():
await self._removed_unit_names.put(event)
self._main_event_loop.call_soon_threadsafe(add_to_queue)
async def next_event(self) -> T:
return await self._removed_unit_names.get()
|