File: _stream.py

package info (click to toggle)
python-tiny-proxy 0.2.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 212 kB
  • sloc: python: 1,154; makefile: 3
file content (42 lines) | stat: -rw-r--r-- 1,397 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import anyio
import anyio.abc
from anyio.streams.buffered import BufferedByteReceiveStream

DEFAULT_RECEIVE_SIZE = 65536


class SocketStream:
    def __init__(self, stream: anyio.abc.SocketStream):
        self._stream = stream
        self._buffered = BufferedByteReceiveStream(stream)
        self._closing = False

    async def send(self, data: bytes) -> None:
        await self._stream.send(data)

    async def send_eof(self) -> None:
        await self._stream.send_eof()

    async def receive(self, max_bytes=DEFAULT_RECEIVE_SIZE) -> bytes:
        return await self._buffered.receive(max_bytes)

    async def receive_exactly(self, n) -> bytes:
        return await self._buffered.receive_exactly(n)

    async def receive_until(self, delimiter: bytes, max_bytes: int) -> bytes:
        return await self._buffered.receive_until(delimiter, max_bytes)

    async def aclose(self):
        if not self._closing:
            self._closing = True
            try:
                # underlying TLSStream.aclose() -> TLSStream.unwrap()
                await self._buffered.aclose()
            except (anyio.BrokenResourceError, anyio.BusyResourceError):
                pass

    def getpeername(self):
        return self._stream.extra(anyio.abc.SocketAttribute.remote_address, '')

    def getsockname(self):
        return self._stream.extra(anyio.abc.SocketAttribute.local_address, '')