1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
import threading
from _typeshed import ConvertibleToInt, Incomplete, Unused
from collections.abc import Callable, Iterable, Mapping, MutableMapping, Sequence
from logging import Logger, _Level as _LoggingLevel
from typing import Any, Final, Generic, TypeVar, overload
__all__ = [
"sub_debug",
"debug",
"info",
"sub_warning",
"get_logger",
"log_to_stderr",
"get_temp_dir",
"register_after_fork",
"is_exiting",
"Finalize",
"ForkAwareThreadLock",
"ForkAwareLocal",
"close_all_fds_except",
"SUBDEBUG",
"SUBWARNING",
]
_T = TypeVar("_T")
_R_co = TypeVar("_R_co", default=Any, covariant=True)
NOTSET: Final[int]
SUBDEBUG: Final[int]
DEBUG: Final[int]
INFO: Final[int]
SUBWARNING: Final[int]
LOGGER_NAME: Final[str]
DEFAULT_LOGGING_FORMAT: Final[str]
def sub_debug(msg: object, *args: object) -> None: ...
def debug(msg: object, *args: object) -> None: ...
def info(msg: object, *args: object) -> None: ...
def sub_warning(msg: object, *args: object) -> None: ...
def get_logger() -> Logger: ...
def log_to_stderr(level: _LoggingLevel | None = None) -> Logger: ...
def is_abstract_socket_namespace(address: str | bytes | None) -> bool: ...
abstract_sockets_supported: bool
def get_temp_dir() -> str: ...
def register_after_fork(obj: _T, func: Callable[[_T], object]) -> None: ...
class Finalize(Generic[_R_co]):
# "args" and "kwargs" are passed as arguments to "callback".
@overload
def __init__(
self,
obj: None,
callback: Callable[..., _R_co],
*,
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] | None = None,
exitpriority: int,
) -> None: ...
@overload
def __init__(
self, obj: None, callback: Callable[..., _R_co], args: Sequence[Any], kwargs: Mapping[str, Any] | None, exitpriority: int
) -> None: ...
@overload
def __init__(
self,
obj: Any,
callback: Callable[..., _R_co],
args: Sequence[Any] = (),
kwargs: Mapping[str, Any] | None = None,
exitpriority: int | None = None,
) -> None: ...
def __call__(
self,
wr: Unused = None,
_finalizer_registry: MutableMapping[Incomplete, Incomplete] = {},
sub_debug: Callable[..., object] = ...,
getpid: Callable[[], int] = ...,
) -> _R_co: ...
def cancel(self) -> None: ...
def still_active(self) -> bool: ...
def is_exiting() -> bool: ...
class ForkAwareThreadLock:
acquire: Callable[[bool, float], bool]
release: Callable[[], None]
def __enter__(self) -> bool: ...
def __exit__(self, *args: Unused) -> None: ...
class ForkAwareLocal(threading.local): ...
MAXFD: Final[int]
def close_all_fds_except(fds: Iterable[int]) -> None: ...
def spawnv_passfds(path: bytes, args: Sequence[ConvertibleToInt], passfds: Sequence[int]) -> int: ...
|