File: proactor_events.pyi

package info (click to toggle)
typeshed 0.0~git20241223.ea91db2-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 28,756 kB
  • sloc: python: 7,741; makefile: 20; sh: 18
file content (65 lines) | stat: -rw-r--r-- 2,598 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import sys
from collections.abc import Mapping
from socket import socket
from typing import Any, ClassVar, Literal

from . import base_events, constants, events, futures, streams, transports

__all__ = ("BaseProactorEventLoop",)

class _ProactorBasePipeTransport(transports._FlowControlMixin, transports.BaseTransport):
    def __init__(
        self,
        loop: events.AbstractEventLoop,
        sock: socket,
        protocol: streams.StreamReaderProtocol,
        waiter: futures.Future[Any] | None = None,
        extra: Mapping[Any, Any] | None = None,
        server: events.AbstractServer | None = None,
    ) -> None: ...
    def __del__(self) -> None: ...

class _ProactorReadPipeTransport(_ProactorBasePipeTransport, transports.ReadTransport):
    if sys.version_info >= (3, 10):
        def __init__(
            self,
            loop: events.AbstractEventLoop,
            sock: socket,
            protocol: streams.StreamReaderProtocol,
            waiter: futures.Future[Any] | None = None,
            extra: Mapping[Any, Any] | None = None,
            server: events.AbstractServer | None = None,
            buffer_size: int = 65536,
        ) -> None: ...
    else:
        def __init__(
            self,
            loop: events.AbstractEventLoop,
            sock: socket,
            protocol: streams.StreamReaderProtocol,
            waiter: futures.Future[Any] | None = None,
            extra: Mapping[Any, Any] | None = None,
            server: events.AbstractServer | None = None,
        ) -> None: ...

class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport, transports.WriteTransport): ...
class _ProactorWritePipeTransport(_ProactorBaseWritePipeTransport): ...
class _ProactorDuplexPipeTransport(_ProactorReadPipeTransport, _ProactorBaseWritePipeTransport, transports.Transport): ...

class _ProactorSocketTransport(_ProactorReadPipeTransport, _ProactorBaseWritePipeTransport, transports.Transport):
    _sendfile_compatible: ClassVar[constants._SendfileMode]
    def __init__(
        self,
        loop: events.AbstractEventLoop,
        sock: socket,
        protocol: streams.StreamReaderProtocol,
        waiter: futures.Future[Any] | None = None,
        extra: Mapping[Any, Any] | None = None,
        server: events.AbstractServer | None = None,
    ) -> None: ...
    def _set_extra(self, sock: socket) -> None: ...
    def can_write_eof(self) -> Literal[True]: ...

class BaseProactorEventLoop(base_events.BaseEventLoop):
    def __init__(self, proactor: Any) -> None: ...
    async def sock_recv(self, sock: socket, n: int) -> bytes: ...