1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
from __future__ import annotations
import pytest
from distributed.client import wait
from distributed.utils_test import (
fetch_metrics,
fetch_metrics_sample_names,
gen_cluster,
slowinc,
)
@gen_cluster(client=True)
async def test_prometheus(c, s, a, b):
pytest.importorskip("prometheus_client")
active_metrics = await fetch_metrics_sample_names(
s.http_server.port, prefix="dask_stealing_"
)
expected_metrics = {
"dask_stealing_request_count_total",
"dask_stealing_request_cost_total",
}
assert active_metrics == expected_metrics
@gen_cluster(client=True)
async def test_prometheus_collect_count_total_by_cost_multipliers(c, s, a, b):
pytest.importorskip("prometheus_client")
async def fetch_metrics_by_cost_multipliers():
families = await fetch_metrics(s.http_server.port, prefix="dask_stealing_")
active_metrics = {
sample.labels["cost_multiplier"]: sample.value
for sample in families["dask_stealing_request_count"].samples
if sample.name == "dask_stealing_request_count_total"
}
return active_metrics
active_metrics = await fetch_metrics_by_cost_multipliers()
stealing = s.extensions["stealing"]
expected_metrics = {str(multiplier): 0 for multiplier in stealing.cost_multipliers}
assert active_metrics == expected_metrics
futures = c.map(
slowinc, range(10), delay=0.1, workers=a.address, allow_other_workers=True
)
await wait(futures)
active_metrics = await fetch_metrics_by_cost_multipliers()
assert len(active_metrics) == len(stealing.cost_multipliers)
count = sum(active_metrics.values())
assert count > 0
expected_count = sum(
len(event[1]) for _, event in s.events["stealing"] if event[0] == "request"
)
assert count == expected_count
@gen_cluster(client=True)
async def test_prometheus_collect_cost_total_by_cost_multipliers(c, s, a, b):
pytest.importorskip("prometheus_client")
async def fetch_metrics_by_cost_multipliers():
families = await fetch_metrics(s.http_server.port, prefix="dask_stealing_")
active_metrics = {
sample.labels["cost_multiplier"]: sample.value
for sample in families["dask_stealing_request_cost"].samples
if sample.name == "dask_stealing_request_cost_total"
}
return active_metrics
active_metrics = await fetch_metrics_by_cost_multipliers()
stealing = s.extensions["stealing"]
expected_metrics = {str(multiplier): 0 for multiplier in stealing.cost_multipliers}
assert active_metrics == expected_metrics
futures = c.map(
slowinc, range(10), delay=0.1, workers=a.address, allow_other_workers=True
)
await wait(futures)
active_metrics = await fetch_metrics_by_cost_multipliers()
assert len(active_metrics) == len(stealing.cost_multipliers)
count = sum(active_metrics.values())
assert count > 0
expected_cost = sum(
request[3]
for _, event in s.events["stealing"]
for request in event[1]
if event[0] == "request"
)
assert count == expected_cost
@gen_cluster(
client=True, clean_kwargs={"threads": False}, scheduler_kwargs={"extensions": {}}
)
async def test_prometheus_without_stealing_extension(c, s, a, b):
pytest.importorskip("prometheus_client")
active_metrics = await fetch_metrics(s.http_server.port, "dask_stealing_")
assert not active_metrics
|