File: windows_events.pyi

package info (click to toggle)
typeshed 0.0~git20241223.ea91db2-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 28,756 kB
  • sloc: python: 7,741; makefile: 20; sh: 18
file content (100 lines) | stat: -rw-r--r-- 4,576 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import socket
import sys
from _typeshed import Incomplete, ReadableBuffer, WriteableBuffer
from collections.abc import Callable
from typing import IO, Any, ClassVar, Final, NoReturn

from . import events, futures, proactor_events, selector_events, streams, windows_utils

if sys.platform == "win32":
    if sys.version_info >= (3, 13):
        # 3.13 added `EventLoop`.
        __all__ = (
            "SelectorEventLoop",
            "ProactorEventLoop",
            "IocpProactor",
            "DefaultEventLoopPolicy",
            "WindowsSelectorEventLoopPolicy",
            "WindowsProactorEventLoopPolicy",
            "EventLoop",
        )
    else:
        __all__ = (
            "SelectorEventLoop",
            "ProactorEventLoop",
            "IocpProactor",
            "DefaultEventLoopPolicy",
            "WindowsSelectorEventLoopPolicy",
            "WindowsProactorEventLoopPolicy",
        )

    NULL: Final = 0
    INFINITE: Final = 0xFFFFFFFF
    ERROR_CONNECTION_REFUSED: Final = 1225
    ERROR_CONNECTION_ABORTED: Final = 1236
    CONNECT_PIPE_INIT_DELAY: float
    CONNECT_PIPE_MAX_DELAY: float

    class PipeServer:
        def __init__(self, address: str) -> None: ...
        def __del__(self) -> None: ...
        def closed(self) -> bool: ...
        def close(self) -> None: ...

    class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop): ...

    class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
        def __init__(self, proactor: IocpProactor | None = None) -> None: ...
        async def create_pipe_connection(
            self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
        ) -> tuple[proactor_events._ProactorDuplexPipeTransport, streams.StreamReaderProtocol]: ...
        async def start_serving_pipe(
            self, protocol_factory: Callable[[], streams.StreamReaderProtocol], address: str
        ) -> list[PipeServer]: ...

    class IocpProactor:
        def __init__(self, concurrency: int = 0xFFFFFFFF) -> None: ...
        def __del__(self) -> None: ...
        def set_loop(self, loop: events.AbstractEventLoop) -> None: ...
        def select(self, timeout: int | None = None) -> list[futures.Future[Any]]: ...
        def recv(self, conn: socket.socket, nbytes: int, flags: int = 0) -> futures.Future[bytes]: ...
        def recv_into(self, conn: socket.socket, buf: WriteableBuffer, flags: int = 0) -> futures.Future[Any]: ...
        def recvfrom(
            self, conn: socket.socket, nbytes: int, flags: int = 0
        ) -> futures.Future[tuple[bytes, socket._RetAddress]]: ...
        def sendto(
            self, conn: socket.socket, buf: ReadableBuffer, flags: int = 0, addr: socket._Address | None = None
        ) -> futures.Future[int]: ...
        def send(self, conn: socket.socket, buf: WriteableBuffer, flags: int = 0) -> futures.Future[Any]: ...
        def accept(self, listener: socket.socket) -> futures.Future[Any]: ...
        def connect(
            self,
            conn: socket.socket,
            address: tuple[Incomplete, Incomplete] | tuple[Incomplete, Incomplete, Incomplete, Incomplete],
        ) -> futures.Future[Any]: ...
        def sendfile(self, sock: socket.socket, file: IO[bytes], offset: int, count: int) -> futures.Future[Any]: ...
        def accept_pipe(self, pipe: socket.socket) -> futures.Future[Any]: ...
        async def connect_pipe(self, address: str) -> windows_utils.PipeHandle: ...
        def wait_for_handle(self, handle: windows_utils.PipeHandle, timeout: int | None = None) -> bool: ...
        def close(self) -> None: ...
        if sys.version_info >= (3, 11):
            def recvfrom_into(
                self, conn: socket.socket, buf: WriteableBuffer, flags: int = 0
            ) -> futures.Future[tuple[int, socket._RetAddress]]: ...

    SelectorEventLoop = _WindowsSelectorEventLoop

    class WindowsSelectorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
        _loop_factory: ClassVar[type[SelectorEventLoop]]
        if sys.version_info < (3, 14):
            def get_child_watcher(self) -> NoReturn: ...
            def set_child_watcher(self, watcher: Any) -> NoReturn: ...

    class WindowsProactorEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
        _loop_factory: ClassVar[type[ProactorEventLoop]]
        def get_child_watcher(self) -> NoReturn: ...
        def set_child_watcher(self, watcher: Any) -> NoReturn: ...

    DefaultEventLoopPolicy = WindowsSelectorEventLoopPolicy
    if sys.version_info >= (3, 13):
        EventLoop = ProactorEventLoop