File: context.pyi

package info (click to toggle)
typeshed 0.0~git20241223.ea91db2-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 28,756 kB
  • sloc: python: 7,741; makefile: 20; sh: 18
file content (206 lines) | stat: -rw-r--r-- 8,578 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import ctypes
import sys
from _ctypes import _CData
from collections.abc import Callable, Iterable, Sequence
from ctypes import _SimpleCData, c_char
from logging import Logger, _Level as _LoggingLevel
from multiprocessing import popen_fork, popen_forkserver, popen_spawn_posix, popen_spawn_win32, queues, synchronize
from multiprocessing.managers import SyncManager
from multiprocessing.pool import Pool as _Pool
from multiprocessing.process import BaseProcess
from multiprocessing.sharedctypes import Synchronized, SynchronizedArray, SynchronizedString
from typing import Any, ClassVar, Literal, TypeVar, overload
from typing_extensions import TypeAlias

if sys.platform != "win32":
    from multiprocessing.connection import Connection
else:
    from multiprocessing.connection import PipeConnection

__all__ = ()

_LockLike: TypeAlias = synchronize.Lock | synchronize.RLock
_T = TypeVar("_T")
_CT = TypeVar("_CT", bound=_CData)

class ProcessError(Exception): ...
class BufferTooShort(ProcessError): ...
class TimeoutError(ProcessError): ...
class AuthenticationError(ProcessError): ...

class BaseContext:
    ProcessError: ClassVar[type[ProcessError]]
    BufferTooShort: ClassVar[type[BufferTooShort]]
    TimeoutError: ClassVar[type[TimeoutError]]
    AuthenticationError: ClassVar[type[AuthenticationError]]

    # N.B. The methods below are applied at runtime to generate
    # multiprocessing.*, so the signatures should be identical (modulo self).
    @staticmethod
    def current_process() -> BaseProcess: ...
    @staticmethod
    def parent_process() -> BaseProcess | None: ...
    @staticmethod
    def active_children() -> list[BaseProcess]: ...
    def cpu_count(self) -> int: ...
    def Manager(self) -> SyncManager: ...

    # N.B. Keep this in sync with multiprocessing.connection.Pipe.
    # _ConnectionBase is the common base class of Connection and PipeConnection
    # and can be used in cross-platform code.
    #
    # The two connections should have the same generic types but inverted (Connection[_T1, _T2], Connection[_T2, _T1]).
    # However, TypeVars scoped entirely within a return annotation is unspecified in the spec.
    if sys.platform != "win32":
        def Pipe(self, duplex: bool = True) -> tuple[Connection[Any, Any], Connection[Any, Any]]: ...
    else:
        def Pipe(self, duplex: bool = True) -> tuple[PipeConnection[Any, Any], PipeConnection[Any, Any]]: ...

    def Barrier(
        self, parties: int, action: Callable[..., object] | None = None, timeout: float | None = None
    ) -> synchronize.Barrier: ...
    def BoundedSemaphore(self, value: int = 1) -> synchronize.BoundedSemaphore: ...
    def Condition(self, lock: _LockLike | None = None) -> synchronize.Condition: ...
    def Event(self) -> synchronize.Event: ...
    def Lock(self) -> synchronize.Lock: ...
    def RLock(self) -> synchronize.RLock: ...
    def Semaphore(self, value: int = 1) -> synchronize.Semaphore: ...
    def Queue(self, maxsize: int = 0) -> queues.Queue[Any]: ...
    def JoinableQueue(self, maxsize: int = 0) -> queues.JoinableQueue[Any]: ...
    def SimpleQueue(self) -> queues.SimpleQueue[Any]: ...
    def Pool(
        self,
        processes: int | None = None,
        initializer: Callable[..., object] | None = None,
        initargs: Iterable[Any] = (),
        maxtasksperchild: int | None = None,
    ) -> _Pool: ...
    @overload
    def RawValue(self, typecode_or_type: type[_CT], *args: Any) -> _CT: ...
    @overload
    def RawValue(self, typecode_or_type: str, *args: Any) -> Any: ...
    @overload
    def RawArray(self, typecode_or_type: type[_CT], size_or_initializer: int | Sequence[Any]) -> ctypes.Array[_CT]: ...
    @overload
    def RawArray(self, typecode_or_type: str, size_or_initializer: int | Sequence[Any]) -> Any: ...
    @overload
    def Value(
        self, typecode_or_type: type[_SimpleCData[_T]], *args: Any, lock: Literal[True] | _LockLike = True
    ) -> Synchronized[_T]: ...
    @overload
    def Value(self, typecode_or_type: type[_CT], *args: Any, lock: Literal[False]) -> Synchronized[_CT]: ...
    @overload
    def Value(self, typecode_or_type: type[_CT], *args: Any, lock: Literal[True] | _LockLike = True) -> Synchronized[_CT]: ...
    @overload
    def Value(self, typecode_or_type: str, *args: Any, lock: Literal[True] | _LockLike = True) -> Synchronized[Any]: ...
    @overload
    def Value(self, typecode_or_type: str | type[_CData], *args: Any, lock: bool | _LockLike = True) -> Any: ...
    @overload
    def Array(
        self, typecode_or_type: type[_SimpleCData[_T]], size_or_initializer: int | Sequence[Any], *, lock: Literal[False]
    ) -> SynchronizedArray[_T]: ...
    @overload
    def Array(
        self, typecode_or_type: type[c_char], size_or_initializer: int | Sequence[Any], *, lock: Literal[True] | _LockLike = True
    ) -> SynchronizedString: ...
    @overload
    def Array(
        self,
        typecode_or_type: type[_SimpleCData[_T]],
        size_or_initializer: int | Sequence[Any],
        *,
        lock: Literal[True] | _LockLike = True,
    ) -> SynchronizedArray[_T]: ...
    @overload
    def Array(
        self, typecode_or_type: str, size_or_initializer: int | Sequence[Any], *, lock: Literal[True] | _LockLike = True
    ) -> SynchronizedArray[Any]: ...
    @overload
    def Array(
        self, typecode_or_type: str | type[_CData], size_or_initializer: int | Sequence[Any], *, lock: bool | _LockLike = True
    ) -> Any: ...
    def freeze_support(self) -> None: ...
    def get_logger(self) -> Logger: ...
    def log_to_stderr(self, level: _LoggingLevel | None = None) -> Logger: ...
    def allow_connection_pickling(self) -> None: ...
    def set_executable(self, executable: str) -> None: ...
    def set_forkserver_preload(self, module_names: list[str]) -> None: ...
    if sys.platform != "win32":
        @overload
        def get_context(self, method: None = None) -> DefaultContext: ...
        @overload
        def get_context(self, method: Literal["spawn"]) -> SpawnContext: ...
        @overload
        def get_context(self, method: Literal["fork"]) -> ForkContext: ...
        @overload
        def get_context(self, method: Literal["forkserver"]) -> ForkServerContext: ...
        @overload
        def get_context(self, method: str) -> BaseContext: ...
    else:
        @overload
        def get_context(self, method: None = None) -> DefaultContext: ...
        @overload
        def get_context(self, method: Literal["spawn"]) -> SpawnContext: ...
        @overload
        def get_context(self, method: str) -> BaseContext: ...

    @overload
    def get_start_method(self, allow_none: Literal[False] = False) -> str: ...
    @overload
    def get_start_method(self, allow_none: bool) -> str | None: ...
    def set_start_method(self, method: str | None, force: bool = False) -> None: ...
    @property
    def reducer(self) -> str: ...
    @reducer.setter
    def reducer(self, reduction: str) -> None: ...
    def _check_available(self) -> None: ...

class Process(BaseProcess):
    _start_method: str | None
    @staticmethod
    def _Popen(process_obj: BaseProcess) -> DefaultContext: ...

class DefaultContext(BaseContext):
    Process: ClassVar[type[Process]]
    def __init__(self, context: BaseContext) -> None: ...
    def get_start_method(self, allow_none: bool = False) -> str: ...
    def get_all_start_methods(self) -> list[str]: ...

_default_context: DefaultContext

class SpawnProcess(BaseProcess):
    _start_method: str
    if sys.platform != "win32":
        @staticmethod
        def _Popen(process_obj: BaseProcess) -> popen_spawn_posix.Popen: ...
    else:
        @staticmethod
        def _Popen(process_obj: BaseProcess) -> popen_spawn_win32.Popen: ...

class SpawnContext(BaseContext):
    _name: str
    Process: ClassVar[type[SpawnProcess]]

if sys.platform != "win32":
    class ForkProcess(BaseProcess):
        _start_method: str
        @staticmethod
        def _Popen(process_obj: BaseProcess) -> popen_fork.Popen: ...

    class ForkServerProcess(BaseProcess):
        _start_method: str
        @staticmethod
        def _Popen(process_obj: BaseProcess) -> popen_forkserver.Popen: ...

    class ForkContext(BaseContext):
        _name: str
        Process: ClassVar[type[ForkProcess]]

    class ForkServerContext(BaseContext):
        _name: str
        Process: ClassVar[type[ForkServerProcess]]

def _force_start_method(method: str) -> None: ...
def get_spawning_popen() -> Any | None: ...
def set_spawning_popen(popen: Any) -> None: ...
def assert_spawning(obj: Any) -> None: ...