1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
|
from __future__ import annotations
import logging
import math
from collections import defaultdict, deque
from collections.abc import Iterable
from datetime import timedelta
from typing import TYPE_CHECKING, cast
import tlz as toolz
from tornado.ioloop import IOLoop
from dask.utils import parse_timedelta
from distributed.compatibility import PeriodicCallback
from distributed.metrics import time
if TYPE_CHECKING:
from distributed.scheduler import WorkerState
logger = logging.getLogger(__name__)
class AdaptiveCore:
"""
The core logic for adaptive deployments, with none of the cluster details
This class controls our adaptive scaling behavior. It is intended to be
used as a super-class or mixin. It expects the following state and methods:
**State**
plan: set
A set of workers that we think should exist.
Here and below worker is just a token, often an address or name string
requested: set
A set of workers that the cluster class has successfully requested from
the resource manager. We expect that resource manager to work to make
these exist.
observed: set
A set of workers that have successfully checked in with the scheduler
These sets are not necessarily equivalent. Often plan and requested will
be very similar (requesting is usually fast) but there may be a large delay
between requested and observed (often resource managers don't give us what
we want).
**Functions**
target : -> int
Returns the target number of workers that should exist.
This is often obtained by querying the scheduler
workers_to_close : int -> Set[worker]
Given a target number of workers,
returns a set of workers that we should close when we're scaling down
scale_up : int -> None
Scales the cluster up to a target number of workers, presumably
changing at least ``plan`` and hopefully eventually also ``requested``
scale_down : Set[worker] -> None
Closes the provided set of workers
Parameters
----------
minimum: int
The minimum number of allowed workers
maximum: int | inf
The maximum number of allowed workers
wait_count: int
The number of scale-down requests we should receive before actually
scaling down
interval: str
The amount of time, like ``"1s"`` between checks
"""
minimum: int
maximum: int | float
wait_count: int
interval: int | float
periodic_callback: PeriodicCallback | None
plan: set[WorkerState]
requested: set[WorkerState]
observed: set[WorkerState]
close_counts: defaultdict[WorkerState, int]
_adapting: bool
log: deque[tuple[float, dict]]
def __init__(
self,
minimum: int = 0,
maximum: int | float = math.inf,
wait_count: int = 3,
interval: str | int | float | timedelta = "1s",
):
if not isinstance(maximum, int) and not math.isinf(maximum):
raise TypeError(f"maximum must be int or inf; got {maximum}")
self.minimum = minimum
self.maximum = maximum
self.wait_count = wait_count
self.interval = parse_timedelta(interval, "seconds")
self.periodic_callback = None
def f():
try:
self.periodic_callback.start()
except AttributeError:
pass
if self.interval:
import weakref
self_ref = weakref.ref(self)
async def _adapt():
core = self_ref()
if core:
await core.adapt()
self.periodic_callback = PeriodicCallback(_adapt, self.interval * 1000)
self.loop.add_callback(f)
try:
self.plan = set()
self.requested = set()
self.observed = set()
except Exception:
pass
# internal state
self.close_counts = defaultdict(int)
self._adapting = False
self.log = deque(maxlen=10000)
def stop(self) -> None:
logger.info("Adaptive stop")
if self.periodic_callback:
self.periodic_callback.stop()
self.periodic_callback = None
async def target(self) -> int:
"""The target number of workers that should exist"""
raise NotImplementedError()
async def workers_to_close(self, target: int) -> list:
"""
Give a list of workers to close that brings us down to target workers
"""
# TODO, improve me with something that thinks about current load
return list(self.observed)[target:]
async def safe_target(self) -> int:
"""Used internally, like target, but respects minimum/maximum"""
n = await self.target()
if n > self.maximum:
n = cast(int, self.maximum)
if n < self.minimum:
n = self.minimum
return n
async def scale_down(self, n: int) -> None:
raise NotImplementedError()
async def scale_up(self, workers: Iterable) -> None:
raise NotImplementedError()
async def recommendations(self, target: int) -> dict:
"""
Make scale up/down recommendations based on current state and target
"""
plan = self.plan
requested = self.requested
observed = self.observed
if target == len(plan):
self.close_counts.clear()
return {"status": "same"}
if target > len(plan):
self.close_counts.clear()
return {"status": "up", "n": target}
# target < len(plan)
not_yet_arrived = requested - observed
to_close = set()
if not_yet_arrived:
to_close.update(toolz.take(len(plan) - target, not_yet_arrived))
if target < len(plan) - len(to_close):
L = await self.workers_to_close(target=target)
to_close.update(L)
firmly_close = set()
for w in to_close:
self.close_counts[w] += 1
if self.close_counts[w] >= self.wait_count:
firmly_close.add(w)
for k in list(self.close_counts): # clear out unseen keys
if k in firmly_close or k not in to_close:
del self.close_counts[k]
if firmly_close:
return {"status": "down", "workers": list(firmly_close)}
else:
return {"status": "same"}
async def adapt(self) -> None:
"""
Check the current state, make recommendations, call scale
This is the main event of the system
"""
if self._adapting: # Semaphore to avoid overlapping adapt calls
return
self._adapting = True
status = None
try:
target = await self.safe_target()
recommendations = await self.recommendations(target)
if recommendations["status"] != "same":
self.log.append((time(), dict(recommendations)))
status = recommendations.pop("status")
if status == "same":
return
if status == "up":
await self.scale_up(**recommendations)
if status == "down":
await self.scale_down(**recommendations)
except OSError:
if status != "down":
logger.error("Adaptive stopping due to error", exc_info=True)
self.stop()
else:
logger.error(
"Error during adaptive downscaling. Ignoring.", exc_info=True
)
finally:
self._adapting = False
def __del__(self):
self.stop()
@property
def loop(self) -> IOLoop:
return IOLoop.current()
|