1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
|
from unittest import mock
import pytest
from aiohttp import streams
@pytest.fixture
def protocol():
return mock.Mock(_reading_paused=False)
@pytest.fixture
def stream(loop, protocol):
out = streams.StreamReader(protocol, limit=1, loop=loop)
out._allow_pause = True
return out
@pytest.fixture
def buffer(loop, protocol):
out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop)
out._allow_pause = True
return out
class TestFlowControlStreamReader:
async def test_read(self, stream) -> None:
stream.feed_data(b'da', 2)
res = await stream.read(1)
assert res == b'd'
assert not stream._protocol.resume_reading.called
async def test_read_resume_paused(self, stream) -> None:
stream.feed_data(b'test', 4)
stream._protocol._reading_paused = True
res = await stream.read(1)
assert res == b't'
assert stream._protocol.pause_reading.called
async def test_readline(self, stream) -> None:
stream.feed_data(b'd\n', 5)
res = await stream.readline()
assert res == b'd\n'
assert not stream._protocol.resume_reading.called
async def test_readline_resume_paused(self, stream) -> None:
stream._protocol._reading_paused = True
stream.feed_data(b'd\n', 5)
res = await stream.readline()
assert res == b'd\n'
assert stream._protocol.resume_reading.called
async def test_readany(self, stream) -> None:
stream.feed_data(b'data', 4)
res = await stream.readany()
assert res == b'data'
assert not stream._protocol.resume_reading.called
async def test_readany_resume_paused(self, stream) -> None:
stream._protocol._reading_paused = True
stream.feed_data(b'data', 4)
res = await stream.readany()
assert res == b'data'
assert stream._protocol.resume_reading.called
async def test_readchunk(self, stream) -> None:
stream.feed_data(b'data', 4)
res, end_of_http_chunk = await stream.readchunk()
assert res == b'data'
assert not end_of_http_chunk
assert not stream._protocol.resume_reading.called
async def test_readchunk_resume_paused(self, stream) -> None:
stream._protocol._reading_paused = True
stream.feed_data(b'data', 4)
res, end_of_http_chunk = await stream.readchunk()
assert res == b'data'
assert not end_of_http_chunk
assert stream._protocol.resume_reading.called
async def test_readexactly(self, stream) -> None:
stream.feed_data(b'data', 4)
res = await stream.readexactly(3)
assert res == b'dat'
assert not stream._protocol.resume_reading.called
async def test_feed_data(self, stream) -> None:
stream._protocol._reading_paused = False
stream.feed_data(b'datadata', 8)
assert stream._protocol.pause_reading.called
async def test_read_nowait(self, stream) -> None:
stream._protocol._reading_paused = True
stream.feed_data(b'data1', 5)
stream.feed_data(b'data2', 5)
stream.feed_data(b'data3', 5)
res = await stream.read(5)
assert res == b'data1'
assert stream._protocol.resume_reading.call_count == 0
res = stream.read_nowait(5)
assert res == b'data2'
assert stream._protocol.resume_reading.call_count == 0
res = stream.read_nowait(5)
assert res == b'data3'
assert stream._protocol.resume_reading.call_count == 1
stream._protocol._reading_paused = False
res = stream.read_nowait(5)
assert res == b''
assert stream._protocol.resume_reading.call_count == 1
class TestFlowControlDataQueue:
def test_feed_pause(self, buffer) -> None:
buffer._protocol._reading_paused = False
buffer.feed_data(object(), 100)
assert buffer._protocol.pause_reading.called
async def test_resume_on_read(self, buffer) -> None:
buffer.feed_data(object(), 100)
buffer._protocol._reading_paused = True
await buffer.read()
assert buffer._protocol.resume_reading.called
|