1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
from __future__ import annotations
from prometheus_client.core import CounterMetricFamily, GaugeMetricFamily
from distributed.http.prometheus import PrometheusCollector
class SemaphoreMetricCollector(PrometheusCollector):
def __init__(self, server):
super().__init__(server)
self.subsystem = "semaphore"
def collect(self):
try:
sem_ext = self.server.extensions["semaphores"]
except KeyError:
return
semaphore_max_leases_family = GaugeMetricFamily(
self.build_name("max_leases"),
"Maximum leases allowed per semaphore\n"
"Note: This will be constant for "
"each semaphore during its lifetime.",
labels=["name"],
)
semaphore_active_leases_family = GaugeMetricFamily(
self.build_name("active_leases"),
"Amount of currently active leases per semaphore",
labels=["name"],
)
semaphore_pending_leases = GaugeMetricFamily(
self.build_name("pending_leases"),
"Amount of currently pending leases per semaphore",
labels=["name"],
)
semaphore_acquire_total = CounterMetricFamily(
self.build_name("acquire_total"),
"Total number of leases acquired per semaphore",
labels=["name"],
)
semaphore_release_total = CounterMetricFamily(
self.build_name("release_total"),
"Total number of leases released per semaphore\n"
"Note: If a semaphore is closed while there are still leases active, "
"this count will not equal `semaphore_acquired_total` after execution.",
labels=["name"],
)
semaphore_average_pending_lease_time = GaugeMetricFamily(
self.build_name("average_pending_lease_time"),
"Exponential moving average of the time it took to acquire a lease "
"per semaphore\n"
"Note: This only includes time spent on scheduler side, "
"it does not include time spent on communication.\n"
"Note: This average is calculated based on order of leases instead "
"of time of lease acquisition.",
labels=["name"],
unit="s",
)
for semaphore_name, semaphore_max_leases in sem_ext.max_leases.items():
semaphore_max_leases_family.add_metric(
[semaphore_name], semaphore_max_leases
)
semaphore_active_leases_family.add_metric(
[semaphore_name], len(sem_ext.leases[semaphore_name])
)
semaphore_pending_leases.add_metric(
[semaphore_name], sem_ext.metrics["pending"][semaphore_name]
)
semaphore_acquire_total.add_metric(
[semaphore_name], sem_ext.metrics["acquire_total"][semaphore_name]
)
semaphore_release_total.add_metric(
[semaphore_name], sem_ext.metrics["release_total"][semaphore_name]
)
semaphore_average_pending_lease_time.add_metric(
[semaphore_name],
sem_ext.metrics["average_pending_lease_time"][semaphore_name],
)
yield semaphore_max_leases_family
yield semaphore_active_leases_family
yield semaphore_pending_leases
yield semaphore_acquire_total
yield semaphore_release_total
yield semaphore_average_pending_lease_time
|