1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206
|
import ctypes
import sys
from _ctypes import _CData
from collections.abc import Callable, Iterable, Sequence
from ctypes import _SimpleCData, c_char
from logging import Logger, _Level as _LoggingLevel
from multiprocessing import popen_fork, popen_forkserver, popen_spawn_posix, popen_spawn_win32, queues, synchronize
from multiprocessing.managers import SyncManager
from multiprocessing.pool import Pool as _Pool
from multiprocessing.process import BaseProcess
from multiprocessing.sharedctypes import Synchronized, SynchronizedArray, SynchronizedString
from typing import Any, ClassVar, Literal, TypeVar, overload
from typing_extensions import TypeAlias
if sys.platform != "win32":
from multiprocessing.connection import Connection
else:
from multiprocessing.connection import PipeConnection
__all__ = ()
_LockLike: TypeAlias = synchronize.Lock | synchronize.RLock
_T = TypeVar("_T")
_CT = TypeVar("_CT", bound=_CData)
class ProcessError(Exception): ...
class BufferTooShort(ProcessError): ...
class TimeoutError(ProcessError): ...
class AuthenticationError(ProcessError): ...
class BaseContext:
ProcessError: ClassVar[type[ProcessError]]
BufferTooShort: ClassVar[type[BufferTooShort]]
TimeoutError: ClassVar[type[TimeoutError]]
AuthenticationError: ClassVar[type[AuthenticationError]]
# N.B. The methods below are applied at runtime to generate
# multiprocessing.*, so the signatures should be identical (modulo self).
@staticmethod
def current_process() -> BaseProcess: ...
@staticmethod
def parent_process() -> BaseProcess | None: ...
@staticmethod
def active_children() -> list[BaseProcess]: ...
def cpu_count(self) -> int: ...
def Manager(self) -> SyncManager: ...
# N.B. Keep this in sync with multiprocessing.connection.Pipe.
# _ConnectionBase is the common base class of Connection and PipeConnection
# and can be used in cross-platform code.
#
# The two connections should have the same generic types but inverted (Connection[_T1, _T2], Connection[_T2, _T1]).
# However, TypeVars scoped entirely within a return annotation is unspecified in the spec.
if sys.platform != "win32":
def Pipe(self, duplex: bool = True) -> tuple[Connection[Any, Any], Connection[Any, Any]]: ...
else:
def Pipe(self, duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]: ...
def Barrier(
self, parties: int, action: Callable[..., object] | None = None, timeout: float | None = None
) -> synchronize.Barrier: ...
def BoundedSemaphore(self, value: int = 1) -> synchronize.BoundedSemaphore: ...
def Condition(self, lock: _LockLike | None = None) -> synchronize.Condition: ...
def Event(self) -> synchronize.Event: ...
def Lock(self) -> synchronize.Lock: ...
def RLock(self) -> synchronize.RLock: ...
def Semaphore(self, value: int = 1) -> synchronize.Semaphore: ...
def Queue(self, maxsize: int = 0) -> queues.Queue[Any]: ...
def JoinableQueue(self, maxsize: int = 0) -> queues.JoinableQueue[Any]: ...
def SimpleQueue(self) -> queues.SimpleQueue[Any]: ...
def Pool(
self,
processes: int | None = None,
initializer: Callable[..., object] | None = None,
initargs: Iterable[Any] = (),
maxtasksperchild: int | None = None,
) -> _Pool: ...
@overload
def RawValue(self, typecode_or_type: type[_CT], *args: Any) -> _CT: ...
@overload
def RawValue(self, typecode_or_type: str, *args: Any) -> Any: ...
@overload
def RawArray(self, typecode_or_type: type[_CT], size_or_initializer: int | Sequence[Any]) -> ctypes.Array[_CT]: ...
@overload
def RawArray(self, typecode_or_type: str, size_or_initializer: int | Sequence[Any]) -> Any: ...
@overload
def Value(
self, typecode_or_type: type[_SimpleCData[_T]], *args: Any, lock: Literal[True] | _LockLike = True
) -> Synchronized[_T]: ...
@overload
def Value(self, typecode_or_type: type[_CT], *args: Any, lock: Literal[False]) -> Synchronized[_CT]: ...
@overload
def Value(self, typecode_or_type: type[_CT], *args: Any, lock: Literal[True] | _LockLike = True) -> Synchronized[_CT]: ...
@overload
def Value(self, typecode_or_type: str, *args: Any, lock: Literal[True] | _LockLike = True) -> Synchronized[Any]: ...
@overload
def Value(self, typecode_or_type: str | type[_CData], *args: Any, lock: bool | _LockLike = True) -> Any: ...
@overload
def Array(
self, typecode_or_type: type[_SimpleCData[_T]], size_or_initializer: int | Sequence[Any], *, lock: Literal[False]
) -> SynchronizedArray[_T]: ...
@overload
def Array(
self, typecode_or_type: type[c_char], size_or_initializer: int | Sequence[Any], *, lock: Literal[True] | _LockLike = True
) -> SynchronizedString: ...
@overload
def Array(
self,
typecode_or_type: type[_SimpleCData[_T]],
size_or_initializer: int | Sequence[Any],
*,
lock: Literal[True] | _LockLike = True,
) -> SynchronizedArray[_T]: ...
@overload
def Array(
self, typecode_or_type: str, size_or_initializer: int | Sequence[Any], *, lock: Literal[True] | _LockLike = True
) -> SynchronizedArray[Any]: ...
@overload
def Array(
self, typecode_or_type: str | type[_CData], size_or_initializer: int | Sequence[Any], *, lock: bool | _LockLike = True
) -> Any: ...
def freeze_support(self) -> None: ...
def get_logger(self) -> Logger: ...
def log_to_stderr(self, level: _LoggingLevel | None = None) -> Logger: ...
def allow_connection_pickling(self) -> None: ...
def set_executable(self, executable: str) -> None: ...
def set_forkserver_preload(self, module_names: list[str]) -> None: ...
if sys.platform != "win32":
@overload
def get_context(self, method: None = None) -> DefaultContext: ...
@overload
def get_context(self, method: Literal["spawn"]) -> SpawnContext: ...
@overload
def get_context(self, method: Literal["fork"]) -> ForkContext: ...
@overload
def get_context(self, method: Literal["forkserver"]) -> ForkServerContext: ...
@overload
def get_context(self, method: str) -> BaseContext: ...
else:
@overload
def get_context(self, method: None = None) -> DefaultContext: ...
@overload
def get_context(self, method: Literal["spawn"]) -> SpawnContext: ...
@overload
def get_context(self, method: str) -> BaseContext: ...
@overload
def get_start_method(self, allow_none: Literal[False] = False) -> str: ...
@overload
def get_start_method(self, allow_none: bool) -> str | None: ...
def set_start_method(self, method: str | None, force: bool = False) -> None: ...
@property
def reducer(self) -> str: ...
@reducer.setter
def reducer(self, reduction: str) -> None: ...
def _check_available(self) -> None: ...
class Process(BaseProcess):
_start_method: str | None
@staticmethod
def _Popen(process_obj: BaseProcess) -> DefaultContext: ...
class DefaultContext(BaseContext):
Process: ClassVar[type[Process]]
def __init__(self, context: BaseContext) -> None: ...
def get_start_method(self, allow_none: bool = False) -> str: ...
def get_all_start_methods(self) -> list[str]: ...
_default_context: DefaultContext
class SpawnProcess(BaseProcess):
_start_method: str
if sys.platform != "win32":
@staticmethod
def _Popen(process_obj: BaseProcess) -> popen_spawn_posix.Popen: ...
else:
@staticmethod
def _Popen(process_obj: BaseProcess) -> popen_spawn_win32.Popen: ...
class SpawnContext(BaseContext):
_name: str
Process: ClassVar[type[SpawnProcess]]
if sys.platform != "win32":
class ForkProcess(BaseProcess):
_start_method: str
@staticmethod
def _Popen(process_obj: BaseProcess) -> popen_fork.Popen: ...
class ForkServerProcess(BaseProcess):
_start_method: str
@staticmethod
def _Popen(process_obj: BaseProcess) -> popen_forkserver.Popen: ...
class ForkContext(BaseContext):
_name: str
Process: ClassVar[type[ForkProcess]]
class ForkServerContext(BaseContext):
_name: str
Process: ClassVar[type[ForkServerProcess]]
def _force_start_method(method: str) -> None: ...
def get_spawning_popen() -> Any | None: ...
def set_spawning_popen(popen: Any) -> None: ...
def assert_spawning(obj: Any) -> None: ...
|