File: _limiter.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (72 lines) | stat: -rw-r--r-- 2,359 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
from __future__ import annotations

import asyncio

from distributed.metrics import time


class ResourceLimiter:
    """Limit an abstract resource

    This allows us to track usage of an abstract resource. If the usage of this
    resources goes beyond a defined maxvalue, we can block further execution

    Example::

        limiter = ResourceLimiter(2)
        limiter.increase(1)
        limiter.increase(2)
        limiter.decrease(1)

        # This will block since we're still not below maxvalue
        await limiter.wait_for_available()
    """

    def __init__(self, maxvalue: int) -> None:
        self._maxvalue = maxvalue
        self._acquired = 0
        self._condition = asyncio.Condition()
        self._waiters = 0
        self.time_blocked_total = 0.0
        self.time_blocked_avg = 0.0

    def __repr__(self) -> str:
        return f"<ResourceLimiter maxvalue: {self._maxvalue} available: {self.available()}>"

    def available(self) -> int:
        """How far can the value be increased before blocking"""
        return max(0, self._maxvalue - self._acquired)

    def free(self) -> bool:
        """Return True if nothing has been acquired / the limiter is in a neutral state"""
        return self._acquired == 0

    async def wait_for_available(self) -> None:
        """Block until the counter drops below maxvalue"""
        start = time()
        duration = 0
        try:
            if self.available():
                return
            async with self._condition:
                self._waiters += 1
                await self._condition.wait_for(self.available)
                self._waiters -= 1
                duration = time() - start
        finally:
            self.time_blocked_total += duration
            self.time_blocked_avg = self.time_blocked_avg * 0.9 + duration * 0.1

    def increase(self, value: int) -> None:
        """Increase the internal counter by value"""
        self._acquired += value

    async def decrease(self, value: int) -> None:
        """Decrease the internal counter by value"""
        if value > self._acquired:
            raise RuntimeError(
                f"Cannot release more than what was acquired! release: {value} acquired: {self._acquired}"
            )
        self._acquired -= value
        async with self._condition:
            self._condition.notify_all()