1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
|
from __future__ import annotations
import asyncio
from typing import Optional, Union
from ..helpers import MockSocket
class MockSSLObject:
def selected_alpn_protocol(self) -> str:
return "h2"
class MemoryReader:
def __init__(self) -> None:
self.data: asyncio.Queue = asyncio.Queue()
self.eof = False
async def send(self, data: bytes) -> None:
if data != b"":
await self.data.put(data)
async def read(self, length: int) -> bytes:
return await self.data.get()
def close(self) -> None:
self.data.put_nowait(b"")
self.eof = True
def at_eof(self) -> bool:
return self.eof and self.data.empty()
class MemoryWriter:
def __init__(self, http2: bool = False) -> None:
self.is_closed = False
self.data: asyncio.Queue = asyncio.Queue()
self.http2 = http2
def get_extra_info(self, name: str) -> Optional[Union[MockSocket, MockSSLObject]]:
if name == "socket":
return MockSocket()
elif self.http2 and name == "ssl_object":
return MockSSLObject()
else:
return None
def write_eof(self) -> None:
self.data.put_nowait(b"")
def write(self, data: bytes) -> None:
if self.is_closed:
raise ConnectionError()
self.data.put_nowait(data)
async def drain(self) -> None:
pass
def close(self) -> None:
self.is_closed = True
self.data.put_nowait(b"")
async def wait_closed(self) -> None:
pass
async def receive(self) -> bytes:
return await self.data.get()
|