1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
from __future__ import annotations
import pytest
from anyio import (
ClosedResourceError,
EndOfStream,
IncompleteRead,
create_memory_object_stream,
)
from anyio.abc import ObjectStream, ObjectStreamConnectable
from anyio.streams.buffered import (
BufferedByteReceiveStream,
BufferedByteStream,
BufferedConnectable,
)
from anyio.streams.stapled import StapledObjectStream
async def test_receive_exactly() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](2)
buffered_stream = BufferedByteReceiveStream(receive_stream)
await send_stream.send(b"abcd")
await send_stream.send(b"efgh")
result = await buffered_stream.receive_exactly(8)
assert result == b"abcdefgh"
assert isinstance(result, bytes)
send_stream.close()
receive_stream.close()
async def test_receive_exactly_incomplete() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
buffered_stream = BufferedByteReceiveStream(receive_stream)
await send_stream.send(b"abcd")
await send_stream.aclose()
with pytest.raises(IncompleteRead):
await buffered_stream.receive_exactly(8)
send_stream.close()
receive_stream.close()
async def test_receive_until() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](2)
buffered_stream = BufferedByteReceiveStream(receive_stream)
await send_stream.send(b"abcd")
await send_stream.send(b"efgh")
result = await buffered_stream.receive_until(b"de", 10)
assert result == b"abc"
assert isinstance(result, bytes)
result = await buffered_stream.receive_until(b"h", 10)
assert result == b"fg"
assert isinstance(result, bytes)
send_stream.close()
receive_stream.close()
async def test_receive_until_incomplete() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
buffered_stream = BufferedByteReceiveStream(receive_stream)
await send_stream.send(b"abcd")
await send_stream.aclose()
with pytest.raises(IncompleteRead):
assert await buffered_stream.receive_until(b"de", 10)
assert buffered_stream.buffer == b"abcd"
send_stream.close()
receive_stream.close()
async def test_buffered_stream() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
buffered_stream = BufferedByteStream(
StapledObjectStream(send_stream, receive_stream)
)
await send_stream.send(b"abcd")
assert await buffered_stream.receive_exactly(2) == b"ab"
assert await buffered_stream.receive_exactly(2) == b"cd"
# send_eof() should close only the sending end
await buffered_stream.send_eof()
pytest.raises(ClosedResourceError, send_stream.send_nowait, b"abc")
pytest.raises(EndOfStream, receive_stream.receive_nowait)
# aclose() closes the receive stream too
await buffered_stream.aclose()
pytest.raises(ClosedResourceError, receive_stream.receive_nowait)
async def test_buffered_connectable() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
memory_stream = StapledObjectStream(send_stream, receive_stream)
class MemoryObjectConnectable(ObjectStreamConnectable[bytes]):
async def connect(self) -> ObjectStream[bytes]:
return memory_stream
connectable = BufferedConnectable(MemoryObjectConnectable())
async with await connectable.connect() as stream:
assert isinstance(stream, BufferedByteStream)
await stream.send(b"abcd")
assert await stream.receive_exactly(2) == b"ab"
assert await stream.receive_exactly(2) == b"cd"
async def test_feed_data() -> None:
send_stream, receive_stream = create_memory_object_stream[bytes](1)
buffered_stream = BufferedByteStream(
StapledObjectStream(send_stream, receive_stream)
)
send_stream.send_nowait(b"abcd")
# The stream has not received the sent data yet, so b"xxx" should come out of the
# buffer first, despite this order of data input
buffered_stream.feed_data(b"xxx")
buffered_stream.feed_data(b"foo")
assert await buffered_stream.receive_exactly(10) == b"xxxfooabcd"
await buffered_stream.aclose()
|