File: test_limiter.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (112 lines) | stat: -rw-r--r-- 2,787 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from __future__ import annotations

import asyncio

import pytest

from distributed.metrics import time
from distributed.shuffle._limiter import ResourceLimiter
from distributed.utils_test import gen_test


@gen_test()
async def test_limiter_basic():
    res = ResourceLimiter(5)

    assert isinstance(repr(res), str)
    res.increase(2)
    assert res.available() == 3
    res.increase(3)

    assert not res.available()
    # This is too much
    res.increase(1)
    assert not res.available()
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(res.wait_for_available(), 0.1)

    await res.decrease(1)
    assert not res.available()

    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(res.wait_for_available(), 0.1)

    await res.decrease(1)
    assert res.available() == 1
    await res.wait_for_available()

    res.increase(1)
    assert not res.available()
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(res.wait_for_available(), 0.1)

    await res.decrease(5)
    assert res.available() == 5

    with pytest.raises(RuntimeError, match="more"):
        await res.decrease(1)

    res.increase(10)
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(res.wait_for_available(), 0.1)

    await res.decrease(3)
    assert not res.available()

    await res.decrease(5)
    assert res.available() == 3


@gen_test()
async def test_limiter_concurrent_decrease_releases_waiter():
    res = ResourceLimiter(5)
    res.increase(5)

    wait_for_available = asyncio.create_task(res.wait_for_available())
    event = asyncio.Event()

    async def decrease():
        await event.wait()
        await res.decrease(5)

    decrease_buffer = asyncio.create_task(decrease())
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(asyncio.shield(wait_for_available), 0.1)

    event.set()
    await wait_for_available


@gen_test()
async def test_limiter_statistics():
    res = ResourceLimiter(1)

    assert res.time_blocked_avg == 0.0
    assert res.time_blocked_total == 0.0

    await res.wait_for_available()

    assert res.time_blocked_avg == 0.0
    assert res.time_blocked_total == 0.0

    res.increase(1)
    start = time()
    blocked_wait = asyncio.create_task(res.wait_for_available())

    await asyncio.sleep(0.05)

    assert not blocked_wait.done()

    await res.decrease(1)

    await blocked_wait
    stop = time()
    assert stop - start >= res.time_blocked_total > 0.0
    assert res.time_blocked_total > res.time_blocked_avg

    before_total = res.time_blocked_total
    before_avg = res.time_blocked_avg

    await res.wait_for_available()
    assert before_total == res.time_blocked_total
    assert before_avg > res.time_blocked_avg