File: helpers.py

package info (click to toggle)
hypercorn 0.17.3-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 908 kB
  • sloc: python: 7,839; makefile: 24; sh: 6
file content (67 lines) | stat: -rw-r--r-- 1,638 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from __future__ import annotations

import asyncio
from typing import Optional, Union

from ..helpers import MockSocket


class MockSSLObject:
    def selected_alpn_protocol(self) -> str:
        return "h2"


class MemoryReader:
    def __init__(self) -> None:
        self.data: asyncio.Queue = asyncio.Queue()
        self.eof = False

    async def send(self, data: bytes) -> None:
        if data != b"":
            await self.data.put(data)

    async def read(self, length: int) -> bytes:
        return await self.data.get()

    def close(self) -> None:
        self.data.put_nowait(b"")
        self.eof = True

    def at_eof(self) -> bool:
        return self.eof and self.data.empty()


class MemoryWriter:
    def __init__(self, http2: bool = False) -> None:
        self.is_closed = False
        self.data: asyncio.Queue = asyncio.Queue()
        self.http2 = http2

    def get_extra_info(self, name: str) -> Optional[Union[MockSocket, MockSSLObject]]:
        if name == "socket":
            return MockSocket()
        elif self.http2 and name == "ssl_object":
            return MockSSLObject()
        else:
            return None

    def write_eof(self) -> None:
        self.data.put_nowait(b"")

    def write(self, data: bytes) -> None:
        if self.is_closed:
            raise ConnectionError()
        self.data.put_nowait(data)

    async def drain(self) -> None:
        pass

    def close(self) -> None:
        self.is_closed = True
        self.data.put_nowait(b"")

    async def wait_closed(self) -> None:
        pass

    async def receive(self) -> bytes:
        return await self.data.get()