File: test_components.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (87 lines) | stat: -rw-r--r-- 2,247 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from __future__ import annotations

import pytest

pytest.importorskip("bokeh")

from bokeh.models import ColumnDataSource, Model

from distributed.dashboard.components.shared import (
    Processing,
    ProfilePlot,
    ProfileTimePlot,
)
from distributed.utils_test import gen_cluster, slowinc


@pytest.mark.parametrize("Component", [Processing])
def test_basic(Component):
    c = Component()
    assert isinstance(c.source, ColumnDataSource)
    assert isinstance(c.root, Model)


@gen_cluster(
    client=True,
    clean_kwargs={"threads": False},
    config={"distributed.worker.profile.enabled": True},
)
async def test_profile_plot(c, s, a, b):
    p = ProfilePlot()
    assert not p.source.data["left"]
    while not len(p.source.data["left"]):
        await c.submit(slowinc, 1, pure=False, delay=0.1)
        p.update(a.profile_recent)


@gen_cluster(
    client=True,
    clean_kwargs={"threads": False},
    config={
        "distributed.worker.profile.enabled": True,
        "distributed.worker.profile.interval": "10ms",
        "distributed.worker.profile.cycle": "50ms",
    },
)
async def test_profile_time_plot(c, s, a, b):
    from bokeh.io import curdoc

    sp = ProfileTimePlot(s, doc=curdoc())
    assert "disabled" not in sp.subtitle.text
    sp.trigger_update()

    ap = ProfileTimePlot(a, doc=curdoc())
    assert "disabled" not in sp.subtitle.text
    ap.trigger_update()

    assert len(sp.source.data["left"]) == 0
    assert len(ap.source.data["left"]) == 0

    await c.gather(c.map(slowinc, range(10), delay=0.05))

    ap.trigger_update()
    sp.trigger_update()


@gen_cluster(
    client=True,
    clean_kwargs={"threads": False},
    config={
        "distributed.worker.profile.enabled": False,
        "distributed.worker.profile.interval": "10ms",
        "distributed.worker.profile.cycle": "50ms",
    },
)
async def test_profile_time_plot_disabled(c, s, a, b):
    from bokeh.io import curdoc

    sp = ProfileTimePlot(s, doc=curdoc())
    assert "disabled" in sp.subtitle.text
    sp.trigger_update()

    ap = ProfileTimePlot(a, doc=curdoc())
    assert "disabled" in sp.subtitle.text
    ap.trigger_update()

    assert len(sp.source.data["left"]) == 0
    assert len(ap.source.data["left"]) == 0