File: test_dask_spec.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (93 lines) | stat: -rw-r--r-- 2,585 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
from __future__ import annotations

import sys

import yaml

from distributed import Client
from distributed.utils import open_port
from distributed.utils_test import gen_cluster, gen_test, popen


@gen_test(timeout=120)
async def test_text():
    port = open_port()
    with popen(
        [
            sys.executable,
            "-m",
            "distributed.cli.dask_spec",
            "--spec",
            '{"cls": "dask.distributed.Scheduler", "opts": {"port": %d}}' % port,
        ]
    ):
        with popen(
            [
                sys.executable,
                "-m",
                "distributed.cli.dask_spec",
                "tcp://localhost:%d" % port,
                "--spec",
                '{"cls": "dask.distributed.Worker", "opts": {"nanny": false, "nthreads": 3, "name": "foo"}}',
            ]
        ):
            async with Client("tcp://localhost:%d" % port, asynchronous=True) as client:
                await client.wait_for_workers(1)
                info = await client.scheduler.identity()
                [w] = info["workers"].values()
                assert w["name"] == "foo"
                assert w["nthreads"] == 3


@gen_cluster(client=True, nthreads=[])
async def test_file(c, s, tmp_path):
    fn = str(tmp_path / "foo.yaml")
    with open(fn, "w") as f:
        yaml.dump(
            {
                "cls": "dask.distributed.Worker",
                "opts": {"nanny": False, "nthreads": 3, "name": "foo"},
            },
            f,
        )
    with popen(
        [
            sys.executable,
            "-m",
            "distributed.cli.dask_spec",
            s.address,
            "--spec-file",
            fn,
        ]
    ):
        await c.wait_for_workers(1)
        info = await c.scheduler.identity()
        [w] = info["workers"].values()
        assert w["name"] == "foo"
        assert w["nthreads"] == 3


def test_errors():
    with popen(
        [
            sys.executable,
            "-m",
            "distributed.cli.dask_spec",
            "--spec",
            '{"foo": "bar"}',
            "--spec-file",
            "foo.yaml",
        ],
        capture_output=True,
    ) as proc:
        line = proc.stdout.readline().decode()
        assert "exactly one" in line
        assert "--spec" in line and "--spec-file" in line

    with popen(
        [sys.executable, "-m", "distributed.cli.dask_spec"],
        capture_output=True,
    ) as proc:
        line = proc.stdout.readline().decode()
        assert "exactly one" in line
        assert "--spec" in line and "--spec-file" in line