1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253
|
"""Tests for asyncio/timeouts.py"""
import unittest
import time
import asyncio
from asyncio import tasks
def tearDownModule():
asyncio.set_event_loop_policy(None)
class TimeoutTests(unittest.IsolatedAsyncioTestCase):
async def test_timeout_basic(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01) as cm:
await asyncio.sleep(10)
self.assertTrue(cm.expired())
async def test_timeout_at_basic(self):
loop = asyncio.get_running_loop()
with self.assertRaises(TimeoutError):
deadline = loop.time() + 0.01
async with asyncio.timeout_at(deadline) as cm:
await asyncio.sleep(10)
self.assertTrue(cm.expired())
self.assertEqual(deadline, cm.when())
async def test_nested_timeouts(self):
loop = asyncio.get_running_loop()
cancelled = False
with self.assertRaises(TimeoutError):
deadline = loop.time() + 0.01
async with asyncio.timeout_at(deadline) as cm1:
# Only the topmost context manager should raise TimeoutError
try:
async with asyncio.timeout_at(deadline) as cm2:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled = True
raise
self.assertTrue(cancelled)
self.assertTrue(cm1.expired())
self.assertTrue(cm2.expired())
async def test_waiter_cancelled(self):
loop = asyncio.get_running_loop()
cancelled = False
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01):
try:
await asyncio.sleep(10)
except asyncio.CancelledError:
cancelled = True
raise
self.assertTrue(cancelled)
async def test_timeout_not_called(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
async with asyncio.timeout(10) as cm:
await asyncio.sleep(0.01)
t1 = loop.time()
self.assertFalse(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertGreater(cm.when(), t1)
async def test_timeout_disabled(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
async with asyncio.timeout(None) as cm:
await asyncio.sleep(0.01)
t1 = loop.time()
self.assertFalse(cm.expired())
self.assertIsNone(cm.when())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
async def test_timeout_at_disabled(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
async with asyncio.timeout_at(None) as cm:
await asyncio.sleep(0.01)
t1 = loop.time()
self.assertFalse(cm.expired())
self.assertIsNone(cm.when())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
async def test_timeout_zero(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0) as cm:
await asyncio.sleep(10)
t1 = loop.time()
self.assertTrue(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertTrue(t0 <= cm.when() <= t1)
async def test_timeout_zero_sleep_zero(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0) as cm:
await asyncio.sleep(0)
t1 = loop.time()
self.assertTrue(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertTrue(t0 <= cm.when() <= t1)
async def test_timeout_in_the_past_sleep_zero(self):
loop = asyncio.get_running_loop()
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(-11) as cm:
await asyncio.sleep(0)
t1 = loop.time()
self.assertTrue(cm.expired())
# 2 sec for slow CI boxes
self.assertLess(t1-t0, 2)
self.assertTrue(t0 >= cm.when() <= t1)
async def test_foreign_exception_passed(self):
with self.assertRaises(KeyError):
async with asyncio.timeout(0.01) as cm:
raise KeyError
self.assertFalse(cm.expired())
async def test_foreign_exception_on_timeout(self):
async def crash():
try:
await asyncio.sleep(1)
finally:
1/0
with self.assertRaises(ZeroDivisionError):
async with asyncio.timeout(0.01):
await crash()
async def test_foreign_cancel_doesnt_timeout_if_not_expired(self):
with self.assertRaises(asyncio.CancelledError):
async with asyncio.timeout(10) as cm:
asyncio.current_task().cancel()
await asyncio.sleep(10)
self.assertFalse(cm.expired())
async def test_outer_task_is_not_cancelled(self):
async def outer() -> None:
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.001):
await asyncio.sleep(10)
task = asyncio.create_task(outer())
await task
self.assertFalse(task.cancelled())
self.assertTrue(task.done())
async def test_nested_timeouts_concurrent(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.002):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.1):
# Pretend we crunch some numbers.
time.sleep(0.01)
await asyncio.sleep(1)
async def test_nested_timeouts_loop_busy(self):
# After the inner timeout is an expensive operation which should
# be stopped by the outer timeout.
loop = asyncio.get_running_loop()
# Disable a message about long running task
loop.slow_callback_duration = 10
t0 = loop.time()
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.1): # (1)
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01): # (2)
# Pretend the loop is busy for a while.
time.sleep(0.1)
await asyncio.sleep(1)
# TimeoutError was cought by (2)
await asyncio.sleep(10) # This sleep should be interrupted by (1)
t1 = loop.time()
self.assertTrue(t0 <= t1 <= t0 + 1)
async def test_reschedule(self):
loop = asyncio.get_running_loop()
fut = loop.create_future()
deadline1 = loop.time() + 10
deadline2 = deadline1 + 20
async def f():
async with asyncio.timeout_at(deadline1) as cm:
fut.set_result(cm)
await asyncio.sleep(50)
task = asyncio.create_task(f())
cm = await fut
self.assertEqual(cm.when(), deadline1)
cm.reschedule(deadline2)
self.assertEqual(cm.when(), deadline2)
cm.reschedule(None)
self.assertIsNone(cm.when())
task.cancel()
with self.assertRaises(asyncio.CancelledError):
await task
self.assertFalse(cm.expired())
async def test_repr_active(self):
async with asyncio.timeout(10) as cm:
self.assertRegex(repr(cm), r"<Timeout \[active\] when=\d+\.\d*>")
async def test_repr_expired(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01) as cm:
await asyncio.sleep(10)
self.assertEqual(repr(cm), "<Timeout [expired]>")
async def test_repr_finished(self):
async with asyncio.timeout(10) as cm:
await asyncio.sleep(0)
self.assertEqual(repr(cm), "<Timeout [finished]>")
async def test_repr_disabled(self):
async with asyncio.timeout(None) as cm:
self.assertEqual(repr(cm), r"<Timeout [active] when=None>")
async def test_nested_timeout_in_finally(self):
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01):
try:
await asyncio.sleep(1)
finally:
with self.assertRaises(TimeoutError):
async with asyncio.timeout(0.01):
await asyncio.sleep(10)
if __name__ == '__main__':
unittest.main()
|