File: test_async_task_group.py

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (159 lines) | stat: -rw-r--r-- 3,547 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from __future__ import annotations

import asyncio
import time as timemod

import pytest

from distributed._async_taskgroup import AsyncTaskGroup, AsyncTaskGroupClosedError
from distributed.utils_test import gen_test


async def _wait_for_n_loop_cycles(n):
    for _ in range(n):
        await asyncio.sleep(0)


def test_async_task_group_initialization():
    group = AsyncTaskGroup()
    assert not group.closed
    assert len(group) == 0


@gen_test()
async def test_async_task_group_call_soon_executes_task_in_background():
    group = AsyncTaskGroup()
    ev = asyncio.Event()
    flag = False

    async def set_flag():
        nonlocal flag
        await ev.wait()
        flag = True

    assert group.call_soon(set_flag) is None
    assert len(group) == 1
    ev.set()
    await _wait_for_n_loop_cycles(2)
    assert len(group) == 0
    assert flag


@gen_test()
async def test_async_task_group_call_later_executes_delayed_task_in_background():
    group = AsyncTaskGroup()
    ev = asyncio.Event()

    start = timemod.monotonic()
    assert group.call_later(1, ev.set) is None
    assert len(group) == 1
    await ev.wait()
    end = timemod.monotonic()
    # the task must be removed in exactly 1 event loop cycle
    await _wait_for_n_loop_cycles(2)
    assert len(group) == 0
    assert end - start > 1 - timemod.get_clock_info("monotonic").resolution


def test_async_task_group_close_closes():
    group = AsyncTaskGroup()
    group.close()
    assert group.closed

    # Test idempotency
    group.close()
    assert group.closed


@gen_test()
async def test_async_task_group_close_does_not_cancel_existing_tasks():
    group = AsyncTaskGroup()

    ev = asyncio.Event()
    flag = False

    async def set_flag():
        nonlocal flag
        await ev.wait()
        flag = True
        return None

    assert group.call_soon(set_flag) is None

    group.close()

    assert len(group) == 1

    ev.set()
    await _wait_for_n_loop_cycles(2)
    assert len(group) == 0


@gen_test()
async def test_async_task_group_close_prohibits_new_tasks():
    group = AsyncTaskGroup()
    group.close()

    ev = asyncio.Event()
    flag = False

    async def set_flag():
        nonlocal flag
        await ev.wait()
        flag = True
        return True

    with pytest.raises(AsyncTaskGroupClosedError):
        group.call_soon(set_flag)
    assert len(group) == 0

    with pytest.raises(AsyncTaskGroupClosedError):
        group.call_later(1, set_flag)
    assert len(group) == 0

    await asyncio.sleep(0.01)
    assert not flag


@gen_test()
async def test_async_task_group_stop_disallows_shutdown():
    group = AsyncTaskGroup()

    task = None

    async def set_flag():
        nonlocal task
        task = asyncio.current_task()

    assert group.call_soon(set_flag) is None
    assert len(group) == 1
    # tasks are not given a grace period, and are not even allowed to start
    # if the group is closed immediately
    await group.stop()
    assert task is None


@gen_test()
async def test_async_task_group_stop_cancels_long_running():
    group = AsyncTaskGroup()

    task = None
    flag = False
    started = asyncio.Event()

    async def set_flag():
        nonlocal task
        task = asyncio.current_task()
        started.set()
        await asyncio.sleep(10)
        nonlocal flag
        flag = True
        return True

    assert group.call_soon(set_flag) is None
    assert len(group) == 1
    await started.wait()
    await group.stop()
    assert task
    assert task.cancelled()
    assert not flag