File: test_nvml.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (171 lines) | stat: -rw-r--r-- 4,673 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
from __future__ import annotations

import multiprocessing as mp
import os

import pytest

pytestmark = pytest.mark.gpu

pynvml = pytest.importorskip("pynvml")

import dask

from distributed.diagnostics import nvml
from distributed.utils_test import gen_cluster


@pytest.fixture(autouse=True)
def reset_nvml_state():
    try:
        pynvml.nvmlShutdown()
    except pynvml.NVMLError_Uninitialized:
        pass
    nvml.NVML_STATE = nvml.NVML_STATE.UNINITIALIZED
    nvml.NVML_OWNER_PID = None


def test_one_time():
    if nvml.device_get_count() < 1:
        pytest.skip("No GPUs available")

    output = nvml.one_time()
    assert "memory-total" in output
    assert "name" in output

    assert len(output["name"]) > 0


def test_enable_disable_nvml():
    with dask.config.set({"distributed.diagnostics.nvml": False}):
        nvml.init_once()
        assert not nvml.is_initialized()
        assert nvml.NVML_STATE == nvml.NVMLState.DISABLED_CONFIG

    # Idempotent (once we've decided not to turn things on with
    # configuration, it's set in stone)
    nvml.init_once()
    assert not nvml.is_initialized()
    assert nvml.NVML_STATE == nvml.NVMLState.DISABLED_CONFIG


def test_wsl_monitoring_enabled():
    nvml.init_once()
    assert nvml.NVML_STATE != nvml.NVMLState.DISABLED_WSL_INSUFFICIENT_DRIVER


def run_has_cuda_context(queue):
    try:
        assert not nvml.has_cuda_context().has_context

        import numba.cuda

        numba.cuda.current_context()
        ctx = nvml.has_cuda_context()
        assert (
            ctx.has_context
            and ctx.device_info.device_index == 0
            and isinstance(ctx.device_info.uuid, bytes)
        )

        queue.put(None)

    except Exception as e:
        queue.put(e)


@pytest.mark.xfail(reason="If running on Docker, requires --pid=host")
def test_has_cuda_context():
    if nvml.device_get_count() < 1:
        pytest.skip("No GPUs available")

    # This test should be run in a new process so that it definitely doesn't have a CUDA context
    # and uses a queue to pass exceptions back
    ctx = mp.get_context("spawn")
    queue = ctx.Queue()
    p = ctx.Process(target=run_has_cuda_context, args=(queue,))
    p.start()
    p.join()  # this blocks until the process terminates
    e = queue.get()
    if e is not None:
        raise e


def test_1_visible_devices():
    if nvml.device_get_count() < 1:
        pytest.skip("No GPUs available")

    os.environ["CUDA_VISIBLE_DEVICES"] = "0"
    output = nvml.one_time()
    h = nvml._pynvml_handles()
    assert output["memory-total"] == pynvml.nvmlDeviceGetMemoryInfo(h).total


@pytest.mark.parametrize("CVD", ["1,0", "0,1"])
def test_2_visible_devices(CVD):
    if nvml.device_get_count() < 2:
        pytest.skip("Less than two GPUs available")

    os.environ["CUDA_VISIBLE_DEVICES"] = CVD
    idx = int(CVD.split(",")[0])

    h = nvml._pynvml_handles()
    h2 = pynvml.nvmlDeviceGetHandleByIndex(idx)

    s = pynvml.nvmlDeviceGetSerial(h)
    s2 = pynvml.nvmlDeviceGetSerial(h2)

    assert s == s2


@gen_cluster()
async def test_gpu_metrics(s, a, b):
    if nvml.device_get_count() < 1:
        pytest.skip("No GPUs available")

    h = nvml._pynvml_handles()

    assert "gpu" in a.metrics
    assert (
        s.workers[a.address].metrics["gpu"]["memory-used"]
        == pynvml.nvmlDeviceGetMemoryInfo(h).used
    )
    assert "gpu" in a.startup_information
    assert (
        s.workers[a.address].extra["gpu"]["name"]
        == pynvml.nvmlDeviceGetName(h).decode()
    )


@pytest.mark.flaky(reruns=5, reruns_delay=2)
@gen_cluster()
async def test_gpu_monitoring_recent(s, a, b):
    if nvml.device_get_count() < 1:
        pytest.skip("No GPUs available")

    h = nvml._pynvml_handles()
    res = await s.get_worker_monitor_info(recent=True)

    assert (
        res[a.address]["range_query"]["gpu_utilization"]
        == pynvml.nvmlDeviceGetUtilizationRates(h).gpu
    )
    assert (
        res[a.address]["range_query"]["gpu_memory_used"]
        == pynvml.nvmlDeviceGetMemoryInfo(h).used
    )
    assert res[a.address]["gpu_name"] == pynvml.nvmlDeviceGetName(h).decode()
    assert res[a.address]["gpu_memory_total"] == pynvml.nvmlDeviceGetMemoryInfo(h).total


@gen_cluster()
async def test_gpu_monitoring_range_query(s, a, b):
    if nvml.device_get_count() < 1:
        pytest.skip("No GPUs available")

    res = await s.get_worker_monitor_info()
    ms = ["gpu_utilization", "gpu_memory_used"]
    for w in (a, b):
        assert all(res[w.address]["range_query"][m] is not None for m in ms)
        assert res[w.address]["count"] is not None
        assert res[w.address]["last_time"] is not None