File: test_flowcontrol_streams.py

package info (click to toggle)
python-aiohttp 3.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 5,612 kB
  • sloc: python: 36,917; ansic: 15,734; makefile: 365; sh: 83
file content (131 lines) | stat: -rw-r--r-- 4,185 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from unittest import mock

import pytest

from aiohttp import streams


@pytest.fixture
def protocol():
    return mock.Mock(_reading_paused=False)


@pytest.fixture
def stream(loop, protocol):
    out = streams.StreamReader(protocol, limit=1, loop=loop)
    out._allow_pause = True
    return out


@pytest.fixture
def buffer(loop, protocol):
    out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop)
    out._allow_pause = True
    return out


class TestFlowControlStreamReader:

    async def test_read(self, stream) -> None:
        stream.feed_data(b'da', 2)
        res = await stream.read(1)
        assert res == b'd'
        assert not stream._protocol.resume_reading.called

    async def test_read_resume_paused(self, stream) -> None:
        stream.feed_data(b'test', 4)
        stream._protocol._reading_paused = True

        res = await stream.read(1)
        assert res == b't'
        assert stream._protocol.pause_reading.called

    async def test_readline(self, stream) -> None:
        stream.feed_data(b'd\n', 5)
        res = await stream.readline()
        assert res == b'd\n'
        assert not stream._protocol.resume_reading.called

    async def test_readline_resume_paused(self, stream) -> None:
        stream._protocol._reading_paused = True
        stream.feed_data(b'd\n', 5)
        res = await stream.readline()
        assert res == b'd\n'
        assert stream._protocol.resume_reading.called

    async def test_readany(self, stream) -> None:
        stream.feed_data(b'data', 4)
        res = await stream.readany()
        assert res == b'data'
        assert not stream._protocol.resume_reading.called

    async def test_readany_resume_paused(self, stream) -> None:
        stream._protocol._reading_paused = True
        stream.feed_data(b'data', 4)
        res = await stream.readany()
        assert res == b'data'
        assert stream._protocol.resume_reading.called

    async def test_readchunk(self, stream) -> None:
        stream.feed_data(b'data', 4)
        res, end_of_http_chunk = await stream.readchunk()
        assert res == b'data'
        assert not end_of_http_chunk
        assert not stream._protocol.resume_reading.called

    async def test_readchunk_resume_paused(self, stream) -> None:
        stream._protocol._reading_paused = True
        stream.feed_data(b'data', 4)
        res, end_of_http_chunk = await stream.readchunk()
        assert res == b'data'
        assert not end_of_http_chunk
        assert stream._protocol.resume_reading.called

    async def test_readexactly(self, stream) -> None:
        stream.feed_data(b'data', 4)
        res = await stream.readexactly(3)
        assert res == b'dat'
        assert not stream._protocol.resume_reading.called

    async def test_feed_data(self, stream) -> None:
        stream._protocol._reading_paused = False
        stream.feed_data(b'datadata', 8)
        assert stream._protocol.pause_reading.called

    async def test_read_nowait(self, stream) -> None:
        stream._protocol._reading_paused = True
        stream.feed_data(b'data1', 5)
        stream.feed_data(b'data2', 5)
        stream.feed_data(b'data3', 5)
        res = await stream.read(5)
        assert res == b'data1'
        assert stream._protocol.resume_reading.call_count == 0

        res = stream.read_nowait(5)
        assert res == b'data2'
        assert stream._protocol.resume_reading.call_count == 0

        res = stream.read_nowait(5)
        assert res == b'data3'
        assert stream._protocol.resume_reading.call_count == 1

        stream._protocol._reading_paused = False
        res = stream.read_nowait(5)
        assert res == b''
        assert stream._protocol.resume_reading.call_count == 1


class TestFlowControlDataQueue:

    def test_feed_pause(self, buffer) -> None:
        buffer._protocol._reading_paused = False
        buffer.feed_data(object(), 100)

        assert buffer._protocol.pause_reading.called

    async def test_resume_on_read(self, buffer) -> None:
        buffer.feed_data(object(), 100)

        buffer._protocol._reading_paused = True
        await buffer.read()
        assert buffer._protocol.resume_reading.called