1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
|
import socket
import sys
from _typeshed import Incomplete, ReadableBuffer, WriteableBuffer
from collections.abc import Callable
from typing import IO, Any, ClassVar, Final, NoReturn
from . import events, futures, proactor_events, selector_events, streams, windows_utils
if sys.platform == "win32":
if sys.version_info >= (3, 13):
# 3.13 added `EventLoop`.
__all__ = (
"SelectorEventLoop",
"ProactorEventLoop",
"IocpProactor",
"DefaultEventLoopPolicy",
"WindowsSelectorEventLoopPolicy",
"WindowsProactorEventLoopPolicy",
"EventLoop",
)
else:
__all__ = (
"SelectorEventLoop",
"ProactorEventLoop",
"IocpProactor",
"DefaultEventLoopPolicy",
"WindowsSelectorEventLoopPolicy",
"WindowsProactorEventLoopPolicy",
)
NULL: Final = 0
INFINITE: Final = 0xFFFFFFFF
ERROR_CONNECTION_REFUSED: Final = 1225
ERROR_CONNECTION_ABORTED: Final = 1236
CONNECT_PIPE_INIT_DELAY: float
CONNECT_PIPE_MAX_DELAY: float
class PipeServer:
def __init__(self, address: str) -> None: ...
def __del__(self) -> None: ...
def closed(self) -> bool: ...
def close(self) -> None: ...
class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
def __init__(self, proactor: IocpProactor | None = None) -> None: ...
async def create_pipe_connection(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> tuple[proactor_events._ProactorDuplexPipeTransport, streams.StreamReaderProtocol]: ...
async def start_serving_pipe(
self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
) -> list[PipeServer]: ...
class IocpProactor:
def __init__(self, concurrency: int = 0xFFFFFFFF) -> None: ...
def __del__(self) -> None: ...
def set_loop(self, loop: events.AbstractEventLoop) -> None: ...
def select(self, timeout: int | None = None) -> list[futures.Future[Any]]: ...
def recv(self, conn: socket.socket, nbytes: int, flags: int = 0) -> futures.Future[bytes]: ...
def recv_into(self, conn: socket.socket, buf: WriteableBuffer, flags: int = 0) -> futures.Future[Any]: ...
def recvfrom(
self, conn: socket.socket, nbytes: int, flags: int = 0
) -> futures.Future[tuple[bytes, socket._RetAddress]]: ...
def sendto(
self, conn: socket.socket, buf: ReadableBuffer, flags: int = 0, addr: socket._Address | None = None
) -> futures.Future[int]: ...
def send(self, conn: socket.socket, buf: WriteableBuffer, flags: int = 0) -> futures.Future[Any]: ...
def accept(self, listener: socket.socket) -> futures.Future[Any]: ...
def connect(
self,
conn: socket.socket,
address: tuple[Incomplete, Incomplete] | tuple[Incomplete, Incomplete, Incomplete, Incomplete],
) -> futures.Future[Any]: ...
def sendfile(self, sock: socket.socket, file: IO[bytes], offset: int, count: int) -> futures.Future[Any]: ...
def accept_pipe(self, pipe: socket.socket) -> futures.Future[Any]: ...
async def connect_pipe(self, address: str) -> windows_utils.PipeHandle: ...
def wait_for_handle(self, handle: windows_utils.PipeHandle, timeout: int | None = None) -> bool: ...
def close(self) -> None: ...
if sys.version_info >= (3, 11):
def recvfrom_into(
self, conn: socket.socket, buf: WriteableBuffer, flags: int = 0
) -> futures.Future[tuple[int, socket._RetAddress]]: ...
SelectorEventLoop = _WindowsSelectorEventLoop
class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[type[SelectorEventLoop]]
if sys.version_info < (3, 14):
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
_loop_factory: ClassVar[type[ProactorEventLoop]]
def get_child_watcher(self) -> NoReturn: ...
def set_child_watcher(self, watcher: Any) -> NoReturn: ...
DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
if sys.version_info >= (3, 13):
EventLoop = ProactorEventLoop
|