File: test_system_monitor.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (96 lines) | stat: -rw-r--r-- 2,194 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from __future__ import annotations

from time import sleep

import dask

from distributed.system_monitor import SystemMonitor


def test_SystemMonitor():
    sm = SystemMonitor()

    # __init__ calls update()
    a = sm.recent()
    assert any(v > 0 for v in a.values())

    sleep(0.01)
    b = sm.update()
    assert set(a) == set(b)

    for name in (
        "cpu",
        "memory",
        "time",
        "host_net_io.read_bps",
        "host_net_io.write_bps",
    ):
        assert all(v >= 0 for v in sm.quantities[name])

    assert all(len(q) == 2 for q in sm.quantities.values())

    assert "cpu" in repr(sm)


def test_count():
    sm = SystemMonitor(maxlen=5)
    assert sm.count == 1
    sm.update()
    assert sm.count == 2

    for _ in range(10):
        sm.update()

    assert sm.count == 12
    for v in sm.quantities.values():
        assert len(v) == 5


def test_range_query():
    sm = SystemMonitor(maxlen=5)

    assert all(len(v) == 1 for v in sm.range_query(0).values())
    assert all(len(v) == 0 for v in sm.range_query(123).values())

    sm.update()
    sm.update()
    sm.update()

    assert all(len(v) == 4 for v in sm.range_query(0).values())
    assert all(len(v) == 3 for v in sm.range_query(1).values())

    for _ in range(10):
        sm.update()

    assert all(len(v) == 4 for v in sm.range_query(10).values())
    assert all(len(v) == 5 for v in sm.range_query(0).values())


def test_disk_config():
    sm = SystemMonitor()
    a = sm.update()
    assert "host_disk_io.read_bps" in a

    sm = SystemMonitor(monitor_disk_io=False)
    a = sm.update()
    assert "host_disk_io.read_bps" not in a

    with dask.config.set({"distributed.admin.system-monitor.disk": False}):
        sm = SystemMonitor()
        a = sm.update()
        assert "host_disk_io.read_bps" not in a


def test_host_cpu():
    sm = SystemMonitor()
    a = sm.update()
    assert "host_cpu.user" not in a

    sm = SystemMonitor(monitor_host_cpu=True)
    a = sm.update()
    assert "host_cpu.user" in a

    with dask.config.set({"distributed.admin.system-monitor.host-cpu": True}):
        sm = SystemMonitor()
        a = sm.update()
        assert "host_cpu.user" in a