1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
from __future__ import annotations
import asyncio
import sys
from collections.abc import AsyncGenerator, Coroutine, Generator
from typing import Any, cast
import pytest
import anyio
from anyio import (
Event,
TaskInfo,
create_task_group,
get_current_task,
get_running_tasks,
move_on_after,
wait_all_tasks_blocked,
)
from anyio.abc import TaskStatus
from .conftest import asyncio_params
pytestmark = pytest.mark.anyio
get_coro = asyncio.Task.get_coro
def test_main_task_name(
anyio_backend_name: str, anyio_backend_options: dict[str, Any]
) -> None:
task_name = None
async def main() -> None:
nonlocal task_name
task_name = get_current_task().name
anyio.run(main, backend=anyio_backend_name, backend_options=anyio_backend_options)
assert task_name == "tests.test_debugging.test_main_task_name.<locals>.main"
# Work around sniffio/asyncio bug that leaves behind an unclosed event loop
if anyio_backend_name == "asyncio":
import asyncio
import gc
for loop in [
obj
for obj in gc.get_objects()
if isinstance(obj, asyncio.AbstractEventLoop)
]:
loop.close()
@pytest.mark.parametrize(
"name_input,expected",
[
(None, "tests.test_debugging.test_non_main_task_name.<locals>.non_main"),
(b"name", "b'name'"),
("name", "name"),
("", ""),
],
)
async def test_non_main_task_name(
name_input: bytes | str | None, expected: str
) -> None:
async def non_main(*, task_status: TaskStatus) -> None:
task_status.started(anyio.get_current_task().name)
async with anyio.create_task_group() as tg:
name = await tg.start(non_main, name=name_input)
assert name == expected
async def test_get_running_tasks() -> None:
async def inspect() -> None:
await wait_all_tasks_blocked()
new_tasks = set(get_running_tasks()) - existing_tasks
task_infos[:] = sorted(new_tasks, key=lambda info: info.name or "")
event.set()
event = Event()
task_infos: list[TaskInfo] = []
host_task = get_current_task()
async with create_task_group() as tg:
existing_tasks = set(get_running_tasks())
tg.start_soon(event.wait, name="task1")
tg.start_soon(event.wait, name="task2")
tg.start_soon(inspect)
assert len(task_infos) == 3
expected_names = [
"task1",
"task2",
"tests.test_debugging.test_get_running_tasks.<locals>.inspect",
]
for task, expected_name in zip(task_infos, expected_names):
assert task.parent_id == host_task.id
assert task.name == expected_name
assert repr(task).endswith(f"TaskInfo(id={task.id}, name={expected_name!r})")
@pytest.mark.skipif(
sys.version_info >= (3, 11),
reason="Generator based coroutines have been removed in Python 3.11",
)
@pytest.mark.filterwarnings(
'ignore:"@coroutine" decorator is deprecated:DeprecationWarning'
)
def test_wait_generator_based_task_blocked(
asyncio_event_loop: asyncio.AbstractEventLoop,
) -> None:
async def native_coro_part() -> None:
await wait_all_tasks_blocked()
gen = cast(Generator, get_coro(gen_task))
assert not gen.gi_running
coro = cast(Coroutine, gen.gi_yieldfrom)
assert coro.cr_code.co_name == "wait"
event.set()
@asyncio.coroutine # type: ignore[attr-defined]
def generator_part() -> Generator[object, BaseException, None]:
yield from event.wait() # type: ignore[misc]
event = asyncio.Event()
gen_task: asyncio.Task[None] = asyncio_event_loop.create_task(generator_part())
asyncio_event_loop.run_until_complete(native_coro_part())
@pytest.mark.parametrize("anyio_backend", asyncio_params)
async def test_wait_all_tasks_blocked_asend(anyio_backend: str) -> None:
"""Test that wait_all_tasks_blocked() does not crash on an `asend()` object."""
async def agen_func() -> AsyncGenerator[None, None]:
yield
agen = agen_func()
coro = agen.asend(None)
loop = asyncio.get_running_loop()
task = loop.create_task(cast("Coroutine[Any, Any, Any]", coro))
await wait_all_tasks_blocked()
await task
await agen.aclose()
async def test_wait_all_tasks_blocked_cancelled_task() -> None:
done = False
async def self_cancel(*, task_status: TaskStatus) -> None:
nonlocal done
task_status.started()
with move_on_after(-1):
await Event().wait()
done = True
async with create_task_group() as tg:
await tg.start(self_cancel)
await wait_all_tasks_blocked()
assert done
|