1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
|
from __future__ import annotations
import platform
import sys
import pytest
from anyio import create_memory_object_stream
from anyio.abc import ObjectStream, ObjectStreamConnectable
from anyio.streams.stapled import StapledObjectStream
from anyio.streams.text import (
TextConnectable,
TextReceiveStream,
TextSendStream,
TextStream,
)
async def test_receive() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
text_stream = TextReceiveStream(receive_stream)
await send_stream.send(b"\xc3\xa5\xc3\xa4\xc3") # ends with half of the "ö" letter
assert await text_stream.receive() == "åä"
# Send the missing byte for "ö"
await send_stream.send(b"\xb6")
assert await text_stream.receive() == "ö"
send_stream.close()
receive_stream.close()
async def test_send() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
text_stream = TextSendStream(send_stream)
await text_stream.send("åäö")
assert await receive_stream.receive() == b"\xc3\xa5\xc3\xa4\xc3\xb6"
send_stream.close()
receive_stream.close()
@pytest.mark.xfail(
platform.python_implementation() == "PyPy" and sys.pypy_version_info < (7, 3, 2), # type: ignore[attr-defined]
reason="PyPy has a bug in its incremental UTF-8 decoder (#3274)",
)
async def test_receive_encoding_error() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
text_stream = TextReceiveStream(receive_stream, errors="replace")
await send_stream.send(b"\xe5\xe4\xf6") # "åäö" in latin-1
assert await text_stream.receive() == "���"
send_stream.close()
receive_stream.close()
async def test_send_encoding_error() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
text_stream = TextSendStream(send_stream, encoding="iso-8859-1", errors="replace")
await text_stream.send("€")
assert await receive_stream.receive() == b"?"
send_stream.close()
receive_stream.close()
async def test_bidirectional_stream() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
stapled_stream = StapledObjectStream(send_stream, receive_stream)
text_stream = TextStream(stapled_stream)
await text_stream.send("åäö")
assert await receive_stream.receive() == b"\xc3\xa5\xc3\xa4\xc3\xb6"
await send_stream.send(b"\xc3\xa6\xc3\xb8")
assert await text_stream.receive() == "æø"
assert text_stream.extra_attributes == {}
send_stream.close()
receive_stream.close()
async def test_text_connectable() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
memory_stream = StapledObjectStream(send_stream, receive_stream)
class MemoryConnectable(ObjectStreamConnectable[bytes]):
async def connect(self) -> ObjectStream[bytes]:
return memory_stream
connectable = TextConnectable(MemoryConnectable())
async with await connectable.connect() as stream:
assert isinstance(stream, TextStream)
await stream.send("hello")
assert await stream.receive() == "hello"
|