1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
import toolz
from distributed.http.utils import RequestHandler
from distributed.scheduler import ALL_TASK_STATES
from .semaphore import SemaphoreMetricExtension
class _PrometheusCollector:
def __init__(self, dask_server):
self.server = dask_server
def collect(self):
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
yield GaugeMetricFamily(
"dask_scheduler_clients",
"Number of clients connected.",
value=len([k for k in self.server.clients if k != "fire-and-forget"]),
)
yield GaugeMetricFamily(
"dask_scheduler_desired_workers",
"Number of workers scheduler needs for task graph.",
value=self.server.adaptive_target(),
)
worker_states = GaugeMetricFamily(
"dask_scheduler_workers",
"Number of workers known by scheduler.",
labels=["state"],
)
worker_states.add_metric(["connected"], len(self.server.workers))
worker_states.add_metric(["saturated"], len(self.server.saturated))
worker_states.add_metric(["idle"], len(self.server.idle))
yield worker_states
tasks = GaugeMetricFamily(
"dask_scheduler_tasks",
"Number of tasks known by scheduler.",
labels=["state"],
)
task_counter = toolz.merge_with(
sum, (tp.states for tp in self.server.task_prefixes.values())
)
suspicious_tasks = CounterMetricFamily(
"dask_scheduler_tasks_suspicious",
"Total number of times a task has been marked suspicious",
labels=["task_prefix_name"],
)
for tp in self.server.task_prefixes.values():
suspicious_tasks.add_metric([tp.name], tp.suspicious)
yield suspicious_tasks
yield CounterMetricFamily(
"dask_scheduler_tasks_forgotten",
(
"Total number of processed tasks no longer in memory and already "
"removed from the scheduler job queue. Note task groups on the "
"scheduler which have all tasks in the forgotten state are not included."
),
value=task_counter.get("forgotten", 0.0),
)
for state in ALL_TASK_STATES:
tasks.add_metric([state], task_counter.get(state, 0.0))
yield tasks
COLLECTORS = [_PrometheusCollector, SemaphoreMetricExtension]
class PrometheusHandler(RequestHandler):
_collectors = None
def __init__(self, *args, dask_server=None, **kwargs):
import prometheus_client
super().__init__(*args, dask_server=dask_server, **kwargs)
if PrometheusHandler._collectors:
# Especially during testing, multiple schedulers are started
# sequentially in the same python process
for _collector in PrometheusHandler._collectors:
_collector.server = self.server
return
PrometheusHandler._collectors = tuple(
collector(self.server) for collector in COLLECTORS
)
# Register collectors
for instantiated_collector in PrometheusHandler._collectors:
prometheus_client.REGISTRY.register(instantiated_collector)
def get(self):
import prometheus_client
self.write(prometheus_client.generate_latest())
self.set_header("Content-Type", "text/plain; version=0.0.4")
routes = [("/metrics", PrometheusHandler, {})]
|