1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
from __future__ import annotations
from typing import Any
import pytest
from anyio import create_task_group, run
from anyio.lowlevel import (
RunVar,
cancel_shielded_checkpoint,
checkpoint,
checkpoint_if_cancelled,
)
@pytest.mark.parametrize("cancel", [False, True])
async def test_checkpoint_if_cancelled(cancel: bool) -> None:
finished = second_finished = False
async def func() -> None:
nonlocal finished
tg.start_soon(second_func)
if cancel:
tg.cancel_scope.cancel()
await checkpoint_if_cancelled()
finished = True
async def second_func() -> None:
nonlocal second_finished
assert finished != cancel
second_finished = True
async with create_task_group() as tg:
tg.start_soon(func)
assert finished != cancel
assert second_finished
@pytest.mark.parametrize("cancel", [False, True])
async def test_cancel_shielded_checkpoint(cancel: bool) -> None:
finished = second_finished = False
async def func() -> None:
nonlocal finished
await cancel_shielded_checkpoint()
finished = True
async def second_func() -> None:
nonlocal second_finished
assert not finished
second_finished = True
async with create_task_group() as tg:
tg.start_soon(func)
tg.start_soon(second_func)
if cancel:
tg.cancel_scope.cancel()
assert finished
assert second_finished
@pytest.mark.parametrize("cancel", [False, True])
async def test_checkpoint(cancel: bool) -> None:
finished = second_finished = False
async def func() -> None:
nonlocal finished
await checkpoint()
finished = True
async def second_func() -> None:
nonlocal second_finished
assert not finished
second_finished = True
async with create_task_group() as tg:
tg.start_soon(func)
tg.start_soon(second_func)
if cancel:
tg.cancel_scope.cancel()
assert finished != cancel
assert second_finished
class TestRunVar:
def test_get_set(
self,
anyio_backend_name: str,
anyio_backend_options: dict[str, Any],
) -> None:
async def taskfunc(index: int) -> None:
assert var.get() == index
var.set(index + 1)
async def main() -> None:
pytest.raises(LookupError, var.get)
for i in range(2):
var.set(i)
async with create_task_group() as tg:
tg.start_soon(taskfunc, i)
assert var.get() == i + 1
var = RunVar[int]("var")
for _ in range(2):
run(main, backend=anyio_backend_name, backend_options=anyio_backend_options)
async def test_reset_token_used_on_wrong_runvar(self) -> None:
var1 = RunVar[str]("var1")
var2 = RunVar[str]("var2")
token = var1.set("blah")
with pytest.raises(
ValueError, match="This token does not belong to this RunVar"
):
var2.reset(token)
async def test_reset_token_used_twice(self) -> None:
var = RunVar[str]("var")
token = var.set("blah")
var.reset(token)
with pytest.raises(ValueError, match="This token has already been used"):
var.reset(token)
async def test_runvar_does_not_share_storage_by_name(self) -> None:
var1: RunVar[int] = RunVar("var", 1)
var2: RunVar[str] = RunVar("var", "a")
assert var1.get() == 1
assert var2.get() == "a"
var1.set(2)
var2.set("b")
assert var1.get() == 2
assert var2.get() == "b"
|