1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
from __future__ import annotations
import asyncio
import time as timemod
import pytest
from distributed._async_taskgroup import AsyncTaskGroup, AsyncTaskGroupClosedError
from distributed.utils_test import gen_test
async def _wait_for_n_loop_cycles(n):
for _ in range(n):
await asyncio.sleep(0)
def test_async_task_group_initialization():
group = AsyncTaskGroup()
assert not group.closed
assert len(group) == 0
@gen_test()
async def test_async_task_group_call_soon_executes_task_in_background():
group = AsyncTaskGroup()
ev = asyncio.Event()
flag = False
async def set_flag():
nonlocal flag
await ev.wait()
flag = True
assert group.call_soon(set_flag) is None
assert len(group) == 1
ev.set()
await _wait_for_n_loop_cycles(2)
assert len(group) == 0
assert flag
@gen_test()
async def test_async_task_group_call_later_executes_delayed_task_in_background():
group = AsyncTaskGroup()
ev = asyncio.Event()
start = timemod.monotonic()
assert group.call_later(1, ev.set) is None
assert len(group) == 1
await ev.wait()
end = timemod.monotonic()
# the task must be removed in exactly 1 event loop cycle
await _wait_for_n_loop_cycles(2)
assert len(group) == 0
assert end - start > 1 - timemod.get_clock_info("monotonic").resolution
def test_async_task_group_close_closes():
group = AsyncTaskGroup()
group.close()
assert group.closed
# Test idempotency
group.close()
assert group.closed
@gen_test()
async def test_async_task_group_close_does_not_cancel_existing_tasks():
group = AsyncTaskGroup()
ev = asyncio.Event()
flag = False
async def set_flag():
nonlocal flag
await ev.wait()
flag = True
return None
assert group.call_soon(set_flag) is None
group.close()
assert len(group) == 1
ev.set()
await _wait_for_n_loop_cycles(2)
assert len(group) == 0
@gen_test()
async def test_async_task_group_close_prohibits_new_tasks():
group = AsyncTaskGroup()
group.close()
ev = asyncio.Event()
flag = False
async def set_flag():
nonlocal flag
await ev.wait()
flag = True
return True
with pytest.raises(AsyncTaskGroupClosedError):
group.call_soon(set_flag)
assert len(group) == 0
with pytest.raises(AsyncTaskGroupClosedError):
group.call_later(1, set_flag)
assert len(group) == 0
await asyncio.sleep(0.01)
assert not flag
@gen_test()
async def test_async_task_group_stop_disallows_shutdown():
group = AsyncTaskGroup()
task = None
async def set_flag():
nonlocal task
task = asyncio.current_task()
assert group.call_soon(set_flag) is None
assert len(group) == 1
# tasks are not given a grace period, and are not even allowed to start
# if the group is closed immediately
await group.stop()
assert task is None
@gen_test()
async def test_async_task_group_stop_cancels_long_running():
group = AsyncTaskGroup()
task = None
flag = False
started = asyncio.Event()
async def set_flag():
nonlocal task
task = asyncio.current_task()
started.set()
await asyncio.sleep(10)
nonlocal flag
flag = True
return True
assert group.call_soon(set_flag) is None
assert len(group) == 1
await started.wait()
await group.stop()
assert task
assert task.cancelled()
assert not flag
|