File: test_base_protocol.py

package info (click to toggle)
python-aiohttp 3.8.4-1%2Bdeb12u1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 14,492 kB
  • sloc: python: 41,859; ansic: 25,006; makefile: 369; javascript: 31
file content (200 lines) | stat: -rw-r--r-- 4,892 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import asyncio
from contextlib import suppress
from unittest import mock

import pytest

from aiohttp.base_protocol import BaseProtocol


async def test_loop() -> None:
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(None)
    pr = BaseProtocol(loop)
    assert pr._loop is loop


async def test_pause_writing() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop)
    assert not pr._paused
    pr.pause_writing()
    assert pr._paused


async def test_resume_writing_no_waiters() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    pr.pause_writing()
    assert pr._paused
    pr.resume_writing()
    assert not pr._paused


async def test_connection_made() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    assert pr.transport is None
    pr.connection_made(tr)
    assert pr.transport is not None


async def test_connection_lost_not_paused() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    assert pr.connected
    pr.connection_lost(None)
    assert pr.transport is None
    assert not pr.connected


async def test_connection_lost_paused_without_waiter() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    assert pr.connected
    pr.pause_writing()
    pr.connection_lost(None)
    assert pr.transport is None
    assert not pr.connected


async def test_drain_lost() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.connection_lost(None)
    with pytest.raises(ConnectionResetError):
        await pr._drain_helper()


async def test_drain_not_paused() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    assert pr._drain_waiter is None
    await pr._drain_helper()
    assert pr._drain_waiter is None


async def test_resume_drain_waited() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.pause_writing()

    t = loop.create_task(pr._drain_helper())
    await asyncio.sleep(0)

    assert pr._drain_waiter is not None
    pr.resume_writing()
    assert (await t) is None
    assert pr._drain_waiter is None


async def test_lost_drain_waited_ok() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.pause_writing()

    t = loop.create_task(pr._drain_helper())
    await asyncio.sleep(0)

    assert pr._drain_waiter is not None
    pr.connection_lost(None)
    assert (await t) is None
    assert pr._drain_waiter is None


async def test_lost_drain_waited_exception() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.pause_writing()

    t = loop.create_task(pr._drain_helper())
    await asyncio.sleep(0)

    assert pr._drain_waiter is not None
    exc = RuntimeError()
    pr.connection_lost(exc)
    with pytest.raises(RuntimeError) as cm:
        await t
    assert cm.value is exc
    assert pr._drain_waiter is None


async def test_lost_drain_cancelled() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.pause_writing()

    fut = loop.create_future()

    async def wait():
        fut.set_result(None)
        await pr._drain_helper()

    t = loop.create_task(wait())
    await fut
    t.cancel()

    assert pr._drain_waiter is not None
    pr.connection_lost(None)
    with suppress(asyncio.CancelledError):
        await t
    assert pr._drain_waiter is None


async def test_resume_drain_cancelled() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.pause_writing()

    fut = loop.create_future()

    async def wait():
        fut.set_result(None)
        await pr._drain_helper()

    t = loop.create_task(wait())
    await fut
    t.cancel()

    assert pr._drain_waiter is not None
    pr.resume_writing()
    with suppress(asyncio.CancelledError):
        await t
    assert pr._drain_waiter is None


async def test_parallel_drain_race_condition() -> None:
    loop = asyncio.get_event_loop()
    pr = BaseProtocol(loop=loop)
    tr = mock.Mock()
    pr.connection_made(tr)
    pr.pause_writing()

    ts = [loop.create_task(pr._drain_helper()) for _ in range(5)]
    assert not (await asyncio.wait(ts, timeout=0.5))[
        0
    ], "All draining tasks must be pending"

    assert pr._drain_waiter is not None
    pr.resume_writing()
    await asyncio.gather(*ts)
    assert pr._drain_waiter is None