1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import ssl
import sys
from _typeshed import ReadableBuffer, StrPath
from collections.abc import Awaitable, Callable, Iterable, Sequence, Sized
from types import ModuleType
from typing import Any, Protocol, SupportsIndex
from typing_extensions import Self, TypeAlias
from . import events, protocols, transports
from .base_events import Server
if sys.platform == "win32":
__all__ = ("StreamReader", "StreamWriter", "StreamReaderProtocol", "open_connection", "start_server")
else:
__all__ = (
"StreamReader",
"StreamWriter",
"StreamReaderProtocol",
"open_connection",
"start_server",
"open_unix_connection",
"start_unix_server",
)
_ClientConnectedCallback: TypeAlias = Callable[[StreamReader, StreamWriter], Awaitable[None] | None]
class _ReaduntilBuffer(ReadableBuffer, Sized, Protocol): ...
if sys.version_info >= (3, 10):
async def open_connection(
host: str | None = None,
port: int | str | None = None,
*,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> tuple[StreamReader, StreamWriter]: ...
async def start_server(
client_connected_cb: _ClientConnectedCallback,
host: str | Sequence[str] | None = None,
port: int | str | None = None,
*,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> Server: ...
else:
async def open_connection(
host: str | None = None,
port: int | str | None = None,
*,
loop: events.AbstractEventLoop | None = None,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> tuple[StreamReader, StreamWriter]: ...
async def start_server(
client_connected_cb: _ClientConnectedCallback,
host: str | None = None,
port: int | str | None = None,
*,
loop: events.AbstractEventLoop | None = None,
limit: int = 65536,
ssl_handshake_timeout: float | None = ...,
**kwds: Any,
) -> Server: ...
if sys.platform != "win32":
if sys.version_info >= (3, 10):
async def open_unix_connection(
path: StrPath | None = None, *, limit: int = 65536, **kwds: Any
) -> tuple[StreamReader, StreamWriter]: ...
async def start_unix_server(
client_connected_cb: _ClientConnectedCallback, path: StrPath | None = None, *, limit: int = 65536, **kwds: Any
) -> Server: ...
else:
async def open_unix_connection(
path: StrPath | None = None, *, loop: events.AbstractEventLoop | None = None, limit: int = 65536, **kwds: Any
) -> tuple[StreamReader, StreamWriter]: ...
async def start_unix_server(
client_connected_cb: _ClientConnectedCallback,
path: StrPath | None = None,
*,
loop: events.AbstractEventLoop | None = None,
limit: int = 65536,
**kwds: Any,
) -> Server: ...
class FlowControlMixin(protocols.Protocol):
def __init__(self, loop: events.AbstractEventLoop | None = None) -> None: ...
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):
def __init__(
self,
stream_reader: StreamReader,
client_connected_cb: _ClientConnectedCallback | None = None,
loop: events.AbstractEventLoop | None = None,
) -> None: ...
def __del__(self) -> None: ...
class StreamWriter:
def __init__(
self,
transport: transports.WriteTransport,
protocol: protocols.BaseProtocol,
reader: StreamReader | None,
loop: events.AbstractEventLoop,
) -> None: ...
@property
def transport(self) -> transports.WriteTransport: ...
def write(self, data: bytes | bytearray | memoryview) -> None: ...
def writelines(self, data: Iterable[bytes | bytearray | memoryview]) -> None: ...
def write_eof(self) -> None: ...
def can_write_eof(self) -> bool: ...
def close(self) -> None: ...
def is_closing(self) -> bool: ...
async def wait_closed(self) -> None: ...
def get_extra_info(self, name: str, default: Any = None) -> Any: ...
async def drain(self) -> None: ...
if sys.version_info >= (3, 12):
async def start_tls(
self,
sslcontext: ssl.SSLContext,
*,
server_hostname: str | None = None,
ssl_handshake_timeout: float | None = None,
ssl_shutdown_timeout: float | None = None,
) -> None: ...
elif sys.version_info >= (3, 11):
async def start_tls(
self, sslcontext: ssl.SSLContext, *, server_hostname: str | None = None, ssl_handshake_timeout: float | None = None
) -> None: ...
if sys.version_info >= (3, 13):
def __del__(self, warnings: ModuleType = ...) -> None: ...
elif sys.version_info >= (3, 11):
def __del__(self) -> None: ...
class StreamReader:
def __init__(self, limit: int = 65536, loop: events.AbstractEventLoop | None = None) -> None: ...
def exception(self) -> Exception: ...
def set_exception(self, exc: Exception) -> None: ...
def set_transport(self, transport: transports.BaseTransport) -> None: ...
def feed_eof(self) -> None: ...
def at_eof(self) -> bool: ...
def feed_data(self, data: Iterable[SupportsIndex]) -> None: ...
async def readline(self) -> bytes: ...
if sys.version_info >= (3, 13):
async def readuntil(self, separator: _ReaduntilBuffer | tuple[_ReaduntilBuffer, ...] = b"\n") -> bytes: ...
else:
async def readuntil(self, separator: _ReaduntilBuffer = b"\n") -> bytes: ...
async def read(self, n: int = -1) -> bytes: ...
async def readexactly(self, n: int) -> bytes: ...
def __aiter__(self) -> Self: ...
async def __anext__(self) -> bytes: ...
|