File: test_slow_adaptive.py

package info (click to toggle)
dask.distributed 2022.12.1%2Bds.1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 10,164 kB
  • sloc: python: 81,938; javascript: 1,549; makefile: 228; sh: 100
file content (103 lines) | stat: -rw-r--r-- 2,910 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from __future__ import annotations

import asyncio

import pytest

from dask.distributed import Client, Scheduler, SpecCluster, Worker

from distributed.metrics import time
from distributed.utils_test import gen_test, slowinc


class SlowWorker:
    def __init__(self, *args, delay=0, **kwargs):
        self.worker = Worker(*args, **kwargs)
        self.delay = delay
        self.status = None

    @property
    def address(self):
        return self.worker.address

    def __await__(self):
        async def now():
            if self.status != "running":
                self.worker.loop.call_later(self.delay, self.worker.start)
                self.status = "running"
            return self

        return now().__await__()

    async def close(self):
        await self.worker.close()
        self.status = "closed"


scheduler = {"cls": Scheduler, "options": {"dashboard_address": ":0"}}


@gen_test()
async def test_startup():
    start = time()
    async with SpecCluster(
        scheduler=scheduler,
        workers={
            0: {"cls": Worker, "options": {}},
            1: {"cls": SlowWorker, "options": {"delay": 120}},
            2: {"cls": SlowWorker, "options": {"delay": 0}},
        },
        asynchronous=True,
    ) as cluster:
        assert len(cluster.workers) == len(cluster.worker_spec) == 3
        assert time() < start + 60
        assert 0 <= len(cluster.scheduler_info["workers"]) <= 2

        async with Client(cluster, asynchronous=True) as client:
            await client.wait_for_workers(n_workers=2)


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_test()
async def test_scale_up_down():
    start = time()
    async with SpecCluster(
        scheduler=scheduler,
        workers={
            "slow": {"cls": SlowWorker, "options": {"delay": 5}},
            "fast": {"cls": Worker, "options": {}},
        },
        asynchronous=True,
    ) as cluster:
        cluster.scale(1)  # remove a worker, hopefully the one we don't have
        await cluster

        assert list(cluster.worker_spec) == ["fast"]

        cluster.scale(0)
        await cluster
        assert not cluster.worker_spec


@gen_test()
async def test_adaptive():
    start = time()
    async with SpecCluster(
        scheduler=scheduler,
        workers={"fast": {"cls": Worker, "options": {}}},
        worker={"cls": SlowWorker, "options": {"delay": 5}},
        asynchronous=True,
    ) as cluster:
        cluster.adapt(minimum=1, maximum=4, target_duration="1s", interval="20ms")
        async with Client(cluster, asynchronous=True) as client:
            futures = client.map(slowinc, range(200), delay=0.1)

            while len(cluster.worker_spec) <= 1:
                await asyncio.sleep(0.05)

            del futures

            while len(cluster.worker_spec) > 1:
                await asyncio.sleep(0.05)

            assert list(cluster.worker_spec) == ["fast"]