File: test_cluster_dump_plugin.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (23 lines) | stat: -rw-r--r-- 811 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from __future__ import annotations

from distributed.cluster_dump import DumpArtefact
from distributed.diagnostics.cluster_dump import ClusterDump
from distributed.utils_test import gen_cluster, inc


@gen_cluster(client=True)
async def test_cluster_dump_plugin(c, s, *workers, tmp_path):
    dump_file = tmp_path / "cluster_dump.msgpack.gz"
    await c.register_scheduler_plugin(ClusterDump(str(dump_file)), name="cluster-dump")
    plugin = s.plugins["cluster-dump"]
    assert plugin.scheduler is s

    f1 = c.submit(inc, 1)
    f2 = c.submit(inc, f1)

    assert (await f2) == 3
    await s.close()

    dump = DumpArtefact.from_url(str(dump_file))
    assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys())
    assert {f1.key, f2.key} == set(dump.worker_story(f1.key, f2.key).keys())