File: test_stealing_http.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (104 lines) | stat: -rw-r--r-- 3,502 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from __future__ import annotations

import pytest

from distributed.client import wait
from distributed.utils_test import (
    fetch_metrics,
    fetch_metrics_sample_names,
    gen_cluster,
    slowinc,
)


@gen_cluster(client=True)
async def test_prometheus(c, s, a, b):
    pytest.importorskip("prometheus_client")
    active_metrics = await fetch_metrics_sample_names(
        s.http_server.port, prefix="dask_stealing_"
    )
    expected_metrics = {
        "dask_stealing_request_count_total",
        "dask_stealing_request_cost_total",
    }

    assert active_metrics == expected_metrics


@gen_cluster(client=True)
async def test_prometheus_collect_count_total_by_cost_multipliers(c, s, a, b):
    pytest.importorskip("prometheus_client")

    async def fetch_metrics_by_cost_multipliers():
        families = await fetch_metrics(s.http_server.port, prefix="dask_stealing_")
        active_metrics = {
            sample.labels["cost_multiplier"]: sample.value
            for sample in families["dask_stealing_request_count"].samples
            if sample.name == "dask_stealing_request_count_total"
        }
        return active_metrics

    active_metrics = await fetch_metrics_by_cost_multipliers()
    stealing = s.extensions["stealing"]
    expected_metrics = {str(multiplier): 0 for multiplier in stealing.cost_multipliers}
    assert active_metrics == expected_metrics

    futures = c.map(
        slowinc, range(10), delay=0.1, workers=a.address, allow_other_workers=True
    )
    await wait(futures)

    active_metrics = await fetch_metrics_by_cost_multipliers()
    assert len(active_metrics) == len(stealing.cost_multipliers)
    count = sum(active_metrics.values())
    assert count > 0
    expected_count = sum(
        len(event[1]) for _, event in s.events["stealing"] if event[0] == "request"
    )
    assert count == expected_count


@gen_cluster(client=True)
async def test_prometheus_collect_cost_total_by_cost_multipliers(c, s, a, b):
    pytest.importorskip("prometheus_client")

    async def fetch_metrics_by_cost_multipliers():
        families = await fetch_metrics(s.http_server.port, prefix="dask_stealing_")
        active_metrics = {
            sample.labels["cost_multiplier"]: sample.value
            for sample in families["dask_stealing_request_cost"].samples
            if sample.name == "dask_stealing_request_cost_total"
        }
        return active_metrics

    active_metrics = await fetch_metrics_by_cost_multipliers()
    stealing = s.extensions["stealing"]
    expected_metrics = {str(multiplier): 0 for multiplier in stealing.cost_multipliers}
    assert active_metrics == expected_metrics

    futures = c.map(
        slowinc, range(10), delay=0.1, workers=a.address, allow_other_workers=True
    )
    await wait(futures)

    active_metrics = await fetch_metrics_by_cost_multipliers()
    assert len(active_metrics) == len(stealing.cost_multipliers)
    count = sum(active_metrics.values())
    assert count > 0
    expected_cost = sum(
        request[3]
        for _, event in s.events["stealing"]
        for request in event[1]
        if event[0] == "request"
    )
    assert count == expected_cost


@gen_cluster(
    client=True, clean_kwargs={"threads": False}, scheduler_kwargs={"extensions": {}}
)
async def test_prometheus_without_stealing_extension(c, s, a, b):
    pytest.importorskip("prometheus_client")

    active_metrics = await fetch_metrics(s.http_server.port, "dask_stealing_")
    assert not active_metrics