File: test_asyncio.py

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (98 lines) | stat: -rw-r--r-- 2,105 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from __future__ import annotations

import asyncio

import pytest

from distributed._asyncio import RLock
from distributed.utils_test import gen_test


@gen_test()
async def test_rlock():
    l = RLock()

    # Owners are treated as reentrant as long as they compare as equal.
    # Make sure that the comparison uses '==' and not 'is'.
    x1 = ["x"]
    x2 = ["x"]
    assert x1 is not x2, "internalized objects"

    async with l(x1), l(x2):
        assert l._owner == ["x"]
        assert l._count == 2

        fut = asyncio.create_task(l.acquire("y"))
        await asyncio.sleep(0.01)
        assert not fut.done()

        with pytest.raises(RuntimeError):
            l.release("z")

    await fut
    l.release("y")

    with pytest.raises(RuntimeError):
        l.release("w")

    assert l._owner is None
    assert l._count == 0
    assert not l._lock.locked()


@gen_test()
async def test_rlock_nonreentrant():
    """Use the RLock with non-reentrant owners"""
    l = RLock()
    x = object()
    y = object()
    assert x != y

    async with l(x):
        assert l._owner is x
        fut = asyncio.create_task(l.acquire(y))
        await asyncio.sleep(0.01)
        assert not fut.done()
    await fut
    assert l._owner is y
    l.release(y)

    assert l._owner is None
    assert l._count == 0
    assert not l._lock.locked()


@gen_test()
async def test_rlock_none():
    """None is a valid owner"""
    l = RLock()

    async with l(None), l(None):
        assert l._owner is None
        assert l._count == 2
        assert l._lock.locked()

        fut = asyncio.create_task(l.acquire("x"))
        await asyncio.sleep(0.01)
        assert not fut.done()

    await fut
    assert l._owner == "x"
    l.release("x")

    assert l._owner is None
    assert l._count == 0
    assert not l._lock.locked()


@gen_test()
async def test_rlock_release_on_raise():
    """None is a valid owner"""
    l = RLock()
    with pytest.raises(ZeroDivisionError):
        async with l("x"):
            1 / 0

    assert l._owner is None
    assert l._count == 0
    assert not l._lock.locked()