File: test_components.py

package info (click to toggle)
dask.distributed 2021.01.0%2Bds.1-2.1%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 6,668 kB
  • sloc: python: 54,131; javascript: 1,549; makefile: 207; sh: 100
file content (49 lines) | stat: -rw-r--r-- 1,257 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import asyncio

import pytest

pytest.importorskip("bokeh")

from bokeh.models import ColumnDataSource, Model

from distributed.utils_test import slowinc, gen_cluster
from distributed.dashboard.components.shared import (
    Processing,
    ProfilePlot,
    ProfileTimePlot,
)


@pytest.mark.parametrize("Component", [Processing])
def test_basic(Component):
    c = Component()
    assert isinstance(c.source, ColumnDataSource)
    assert isinstance(c.root, Model)


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_profile_plot(c, s, a, b):
    p = ProfilePlot()
    assert not p.source.data["left"]
    await c.gather(c.map(slowinc, range(10), delay=0.05))
    p.update(a.profile_recent)
    assert len(p.source.data["left"]) >= 1


@gen_cluster(client=True, clean_kwargs={"threads": False})
async def test_profile_time_plot(c, s, a, b):
    from bokeh.io import curdoc

    sp = ProfileTimePlot(s, doc=curdoc())
    sp.trigger_update()

    ap = ProfileTimePlot(a, doc=curdoc())
    ap.trigger_update()

    assert len(sp.source.data["left"]) <= 1
    assert len(ap.source.data["left"]) <= 1

    await c.gather(c.map(slowinc, range(10), delay=0.05))
    ap.trigger_update()
    sp.trigger_update()
    await asyncio.sleep(0.05)