1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
import asyncio
import asyncio.trsock
import pathlib
import socket
import sys
from socket import _Address, _RetAddress
from typing import Any, Optional, Tuple, Union
class TransportClosed(Exception):
pass
class DatagramStream:
# Support type-checking in unittests which mock this
_drained: asyncio.Event
def __init__(
self,
transport: asyncio.DatagramProtocol,
recvq: asyncio.Queue[tuple[Optional[bytes], Optional[_Address]]],
excq: asyncio.Queue[Exception],
drained: asyncio.Event,
) -> None: ...
@property
def exception(self) -> None: ...
@property
def sockname(self) -> _RetAddress: ...
@property
def peername(self) -> _RetAddress: ...
@property
def socket(self) -> socket.socket | asyncio.trsock.TransportSocket: ...
def close(self) -> None: ...
async def _send(self, data: bytes, addr: Optional[_Address]) -> None: ...
async def recv(self) -> Tuple[bytes, _Address]: ...
class DatagramServer(DatagramStream):
async def send(self, data: bytes, addr: _Address) -> None: ...
class DatagramClient(DatagramStream):
async def send(self, data: bytes) -> None: ...
class Protocol(asyncio.DatagramProtocol):
# Support type-checking in unittests which mock this
_drained: asyncio.Event
def __init__(
self, recvq: asyncio.Queue[tuple[Optional[bytes], Optional[_Address]]], excq: asyncio.Queue[Exception], drained: asyncio.Event
) -> None: ...
def connection_made(self, transport: asyncio.BaseTransport) -> None: ...
def connection_lost(self, exc: Optional[Exception]) -> None: ...
def datagram_received(self, data: bytes, addr: _Address) -> None: ...
def error_received(self, exc: Exception) -> None: ...
def pause_writing(self) -> None: ...
def resume_writing(self) -> None: ...
async def bind(addr: Union[_Address, pathlib.Path, str], reuse_port: Optional[bool] = None) -> DatagramServer: ...
async def connect(addr: Union[_Address, pathlib.Path, str]) -> DatagramClient: ...
async def from_socket(sock: socket.socket) -> Union[DatagramServer, DatagramClient]: ...
|