File: test_cluster.py

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (89 lines) | stat: -rw-r--r-- 2,837 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from __future__ import annotations

import pytest
from tornado.ioloop import IOLoop

from distributed import LocalCluster, Status
from distributed.deploy.cluster import Cluster, _exponential_backoff
from distributed.utils_test import gen_test


@gen_test()
async def test_eq():
    async with (
        Cluster(asynchronous=True, name="A") as clusterA,
        Cluster(asynchronous=True, name="A2") as clusterA2,
        Cluster(asynchronous=True, name="B") as clusterB,
    ):
        assert clusterA != "A"
        assert not (clusterA == "A")
        assert clusterA == clusterA
        assert not (clusterA != clusterA)
        assert clusterA != clusterA2
        assert not (clusterA == clusterA2)
        assert clusterA != clusterB
        assert not (clusterA == clusterB)


@gen_test()
async def test_repr():
    async with Cluster(asynchronous=True, name="A") as cluster:
        assert cluster.scheduler_address == "<Not Connected>"
        res = repr(cluster)
        expected = "Cluster(A, '<Not Connected>', workers=0, threads=0, memory=0 B)"
        assert res == expected


@gen_test()
async def test_cluster_wait_for_worker():
    async with LocalCluster(
        n_workers=2, asynchronous=True, dashboard_address=":0"
    ) as cluster:
        assert len(cluster.scheduler.workers) == 2
        cluster.scale(4)
        await cluster.wait_for_workers(4)
        assert all(
            [
                worker["status"] == Status.running.name
                for _, worker in cluster.scheduler_info["workers"].items()
            ]
        )
        assert len(cluster.scheduler.workers) == 4


@gen_test()
async def test_deprecated_loop_properties():
    class ExampleCluster(Cluster):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            self.loop = self.io_loop = IOLoop.current()

    with pytest.warns(DeprecationWarning) as warninfo:
        async with ExampleCluster(asynchronous=True, loop=IOLoop.current()):
            pass

    assert [(w.category, *w.message.args) for w in warninfo] == [
        (DeprecationWarning, "setting the loop property is deprecated")
    ]


def test_exponential_backoff():
    assert _exponential_backoff(0, 1.5, 3, 20) == 1.5
    assert _exponential_backoff(1, 1.5, 3, 20) == 4.5
    assert _exponential_backoff(2, 1.5, 3, 20) == 13.5
    assert _exponential_backoff(5, 1.5, 3, 20) == 20
    # avoid overflow
    assert _exponential_backoff(1000, 1.5, 3, 20) == 20


@gen_test()
async def test_sync_context_manager_used_with_async_cluster():
    async with Cluster(asynchronous=True, name="A") as cluster:
        with (
            pytest.raises(
                TypeError,
                match=r"Used 'with' with asynchronous class; please use 'async with'",
            ),
            cluster,
        ):
            pass