File: test_scale_down.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (96 lines) | stat: -rw-r--r-- 2,736 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import logging
import time

import pytest

import parsl
from parsl import File, python_app
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SingleNodeLauncher
from parsl.providers import LocalProvider

logger = logging.getLogger(__name__)

_max_blocks = 5
_min_blocks = 2


def local_config():
    return Config(
        executors=[
            HighThroughputExecutor(
                heartbeat_period=1,
                heartbeat_threshold=2,
                poll_period=100,
                label="htex_local",
                address="127.0.0.1",
                max_workers_per_node=1,
                encrypted=True,
                provider=LocalProvider(
                    init_blocks=0,
                    max_blocks=_max_blocks,
                    min_blocks=_min_blocks,
                    launcher=SingleNodeLauncher(),
                ),
            )
        ],
        max_idletime=0.5,
        strategy='simple',
    )


@python_app
def waiting_app(ident: int, inputs=(), outputs=()):
    import pathlib
    import time

    # Approximate an Event by writing to files; the test logic will poll this file
    with open(outputs[0], "a") as f:
        f.write(f"Ready: {ident}\n")

    # Similarly, use Event approximation (file check!) by polling.
    may_finish_file = pathlib.Path(inputs[0])
    while not may_finish_file.exists():
        time.sleep(0.01)


# see issue #1885 for details of failures of this test.
# at the time of issue #1885 this test was failing frequently
# in CI.
@pytest.mark.local
def test_scale_out(tmpd_cwd, try_assert):
    dfk = parsl.dfk()

    num_managers = len(dfk.executors['htex_local'].connected_managers())

    assert num_managers == 0, "Expected 0 managers at start"
    assert dfk.executors['htex_local'].outstanding == 0, "Expected 0 tasks at start"

    ntasks = 10
    ready_path = tmpd_cwd / "workers_ready"
    finish_path = tmpd_cwd / "workers_may_continue"
    ready_path.touch()
    inputs = [File(finish_path)]
    outputs = [File(ready_path)]

    futs = [waiting_app(i, outputs=outputs, inputs=inputs) for i in range(ntasks)]

    while ready_path.read_text().count("\n") < _max_blocks:
        time.sleep(0.5)

    assert len(dfk.executors['htex_local'].connected_managers()) == _max_blocks

    finish_path.touch()  # Approximation of Event, via files
    [x.result() for x in futs]

    assert dfk.executors['htex_local'].outstanding == 0

    def assert_kernel():
        return len(dfk.executors['htex_local'].connected_managers()) == _min_blocks

    try_assert(
        assert_kernel,
        fail_msg=f"Expected {_min_blocks} managers when no tasks (min_blocks)",
        timeout_ms=15000,
    )