File: test_computations.py

package info (click to toggle)
dask.distributed 2024.12.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,588 kB
  • sloc: python: 96,954; javascript: 1,549; sh: 390; makefile: 220
file content (92 lines) | stat: -rw-r--r-- 2,941 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
"""Tests for distributed.scheduler.Computation objects"""

from __future__ import annotations

import pytest

from distributed import Event, Worker, secede
from distributed.utils_test import async_poll_for, gen_cluster, inc, wait_for_state


@gen_cluster(client=True)
async def test_computations(c, s, a, b):
    pytest.importorskip("numpy")
    da = pytest.importorskip("dask.array")

    x = da.ones(100, chunks=(10,))
    y = (x + 1).persist()
    await y

    z = (x - 2).persist()
    await z

    assert len(s.computations) == 2
    assert "add" in str(s.computations[0].groups)
    assert "sub" in str(s.computations[1].groups)
    assert "sub" not in str(s.computations[0].groups)

    assert isinstance(repr(s.computations[1]), str)

    assert s.computations[1].stop == max(tg.stop for tg in s.task_groups.values())

    assert s.computations[0].states["memory"] == y.npartitions


@gen_cluster(client=True)
async def test_computations_futures(c, s, a, b):
    futures = [c.submit(inc, i) for i in range(10)]
    total = c.submit(sum, futures)
    await total

    [computation] = s.computations
    assert "sum" in str(computation.groups)
    assert "inc" in str(computation.groups)


@gen_cluster(client=True, nthreads=[])
async def test_computations_no_workers(c, s):
    """If a computation is stuck due to lack of workers, don't create a new one"""
    x = c.submit(inc, 1, key="x")
    await wait_for_state("x", ("queued", "no-worker"), s)
    y = c.submit(inc, 2, key="y")
    await wait_for_state("y", ("queued", "no-worker"), s)
    assert s.total_occupancy == 0
    async with Worker(s.address):
        assert await x == 2
        assert await y == 3
        [computation] = s.computations
        assert computation.groups == {s.task_groups["x"], s.task_groups["y"]}


@gen_cluster(client=True)
async def test_computations_no_resources(c, s, a, b):
    """If a computation is stuck due to lack of resources, don't create a new one"""
    x = c.submit(inc, 1, key="x", resources={"A": 1})
    await wait_for_state("x", "no-worker", s)
    y = c.submit(inc, 2, key="y")
    assert await y == 3
    assert s.total_occupancy == 0
    async with Worker(s.address, resources={"A": 1}):
        assert await x == 2
        [computation] = s.computations
        assert computation.groups == {s.task_groups["x"], s.task_groups["y"]}


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_computations_long_running(c, s, a):
    """Don't create new computations if there are long-running tasks"""
    ev = Event()

    def func(ev):
        secede()
        ev.wait()

    x = c.submit(func, ev, key="x")
    await wait_for_state("x", "long-running", a)
    await async_poll_for(lambda: s.total_occupancy == 0, timeout=5)
    y = c.submit(inc, 1, key="y")
    assert await y == 2
    await ev.set()
    await x
    [computation] = s.computations
    assert computation.groups == {s.task_groups["x"], s.task_groups["y"]}