1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
from __future__ import annotations
import sys
from collections import deque
from typing import Any
import psutil
import dask
from distributed.compatibility import WINDOWS
from distributed.diagnostics import nvml
from distributed.metrics import monotonic, time
class SystemMonitor:
proc: psutil.Process
maxlen: int | None
count: int
last_time: float
quantities: dict[str, deque[float]]
monitor_net_io: bool
monitor_disk_io: bool
monitor_host_cpu: bool
_last_net_io_counters: Any # psutil namedtuple
_last_disk_io_counters: Any # psutil namedtuple
_last_host_cpu_counters: Any # dynamically-defined psutil namedtuple
gpu_name: str | None
gpu_memory_total: int
# Defaults to 1h capture time assuming the default
# distributed.admin.system_monitor.interval = 500ms
def __init__(
self,
maxlen: int | None = 7200,
monitor_disk_io: bool | None = None,
monitor_host_cpu: bool | None = None,
):
self.proc = psutil.Process()
self.count = 0
self.maxlen = maxlen
self.last_time = monotonic()
self.quantities = {
"cpu": deque(maxlen=maxlen),
"memory": deque(maxlen=maxlen),
"time": deque(maxlen=maxlen),
}
try:
self._last_net_io_counters = psutil.net_io_counters()
except Exception:
# FIXME is this possible?
self.monitor_net_io = False # pragma: nocover
else:
self.monitor_net_io = True
self.quantities["host_net_io.read_bps"] = deque(maxlen=maxlen)
self.quantities["host_net_io.write_bps"] = deque(maxlen=maxlen)
if monitor_disk_io is None:
monitor_disk_io = dask.config.get("distributed.admin.system-monitor.disk")
if monitor_disk_io:
try:
disk_ioc = psutil.disk_io_counters()
except Exception:
# FIXME is this possible?
monitor_disk_io = False # pragma: nocover
else:
if disk_ioc is None: # pragma: nocover
# diskless machine
monitor_disk_io = False
else:
self._last_disk_io_counters = disk_ioc
self.quantities["host_disk_io.read_bps"] = deque(maxlen=maxlen)
self.quantities["host_disk_io.write_bps"] = deque(maxlen=maxlen)
self.monitor_disk_io = monitor_disk_io
if monitor_host_cpu is None:
monitor_host_cpu = dask.config.get(
"distributed.admin.system-monitor.host-cpu"
)
self.monitor_host_cpu = monitor_host_cpu
if monitor_host_cpu:
self._last_host_cpu_counters = hostcpu_c = psutil.cpu_times()
# This is a namedtuple whose fields change based on OS and kernel version
for k in hostcpu_c._fields:
self.quantities["host_cpu." + k] = deque(maxlen=maxlen)
if not WINDOWS:
self.quantities["num_fds"] = deque(maxlen=maxlen)
if nvml.device_get_count() > 0:
gpu_extra = nvml.one_time()
self.gpu_name = gpu_extra["name"]
self.gpu_memory_total = gpu_extra["memory-total"]
self.quantities["gpu_utilization"] = deque(maxlen=maxlen)
self.quantities["gpu_memory_used"] = deque(maxlen=maxlen)
else:
self.gpu_name = None
self.gpu_memory_total = -1
self.update()
def recent(self) -> dict[str, Any]:
return {k: v[-1] for k, v in self.quantities.items()}
def get_process_memory(self) -> int:
"""Sample process memory, as reported by the OS.
This one-liner function exists so that it can be easily mocked in unit tests,
as the OS allocating and releasing memory is highly volatile and a constant
source of flakiness.
"""
return self.proc.memory_info().rss
def update(self) -> dict[str, Any]:
now = time()
now_mono = monotonic()
duration = (now_mono - self.last_time) or 0.001
self.last_time = now_mono
self.count += 1
with self.proc.oneshot():
result = {
"cpu": self.proc.cpu_percent(),
"memory": self.get_process_memory(),
"time": now,
}
if self.monitor_net_io:
net_ioc = psutil.net_io_counters()
last = self._last_net_io_counters
result["host_net_io.read_bps"] = (
net_ioc.bytes_recv - last.bytes_recv
) / duration
result["host_net_io.write_bps"] = (
net_ioc.bytes_sent - last.bytes_sent
) / duration
self._last_net_io_counters = net_ioc
if self.monitor_disk_io:
disk_ioc = psutil.disk_io_counters()
assert disk_ioc is not None
last_disk = self._last_disk_io_counters
result["host_disk_io.read_bps"] = (
disk_ioc.read_bytes - last_disk.read_bytes
) / duration
result["host_disk_io.write_bps"] = (
disk_ioc.write_bytes - last_disk.write_bytes
) / duration
self._last_disk_io_counters = disk_ioc
if self.monitor_host_cpu:
host_cpu = psutil.cpu_times()
last_cpu = self._last_host_cpu_counters
for k in host_cpu._fields:
delta = getattr(host_cpu, k) - getattr(last_cpu, k)
# cpu_times() has a precision of 2 decimals; suppress noise
result["host_cpu." + k] = round(delta / duration, 2)
self._last_host_cpu_counters = host_cpu
# Note: WINDOWS constant doesn't work with `mypy --platform win32`
if sys.platform != "win32":
result["num_fds"] = self.proc.num_fds()
if self.gpu_name:
gpu_metrics = nvml.real_time()
result["gpu_utilization"] = gpu_metrics["utilization"]
result["gpu_memory_used"] = gpu_metrics["memory-used"]
for name, v in result.items():
if name != "count":
self.quantities[name].append(v)
return result
def __repr__(self) -> str:
return "<SystemMonitor: cpu: %d memory: %d MB fds: %s>" % (
self.quantities["cpu"][-1],
self.quantities["memory"][-1] / 1e6,
"N/A" if WINDOWS else self.quantities["num_fds"][-1],
)
def range_query(self, start: int) -> dict[str, list]:
if start >= self.count:
return {k: [] for k in self.quantities}
istart = min(-1, max(-len(self.quantities["cpu"]), start - self.count))
return {
k: [v[i] if -i <= len(v) else None for i in range(istart, 0)]
for k, v in self.quantities.items()
}
|