File: test_scale_down_htex_auto_scale.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (147 lines) | stat: -rw-r--r-- 4,521 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
from threading import Event

import pytest

import parsl
from parsl import File, python_app
from parsl.config import Config
from parsl.executors import HighThroughputExecutor
from parsl.launchers import SingleNodeLauncher
from parsl.providers import LocalProvider

_max_blocks = 5
_min_blocks = 0


def local_config():
    return Config(
        executors=[
            HighThroughputExecutor(
                heartbeat_period=1,
                heartbeat_threshold=2,
                poll_period=100,
                label="htex_local",
                address="127.0.0.1",
                max_workers_per_node=1,
                encrypted=True,
                provider=LocalProvider(
                    init_blocks=0,
                    max_blocks=_max_blocks,
                    min_blocks=_min_blocks,
                    launcher=SingleNodeLauncher(),
                ),
            )
        ],
        max_idletime=0.5,
        strategy='htex_auto_scale',
        strategy_period=0.1
    )


@python_app
def waiting_app(ident: int, outputs=(), inputs=()):
    import pathlib
    import time

    # Approximate an Event by writing to files; the test logic will poll this file
    with open(outputs[0], "a") as f:
        f.write(f"Ready: {ident}\n")

    # Similarly, use Event approximation (file check!) by polling.
    may_finish_file = pathlib.Path(inputs[0])
    while not may_finish_file.exists():
        time.sleep(0.01)


# see issue #1885 for details of failures of this test.
# at the time of issue #1885 this test was failing frequently
# in CI.
@pytest.mark.local
def test_scale_out(tmpd_cwd, try_assert):
    dfk = parsl.dfk()

    num_managers = len(dfk.executors['htex_local'].connected_managers())

    assert num_managers == 0, "Expected 0 managers at start"
    assert dfk.executors['htex_local'].outstanding == 0, "Expected 0 tasks at start"

    ntasks = _max_blocks * 2
    ready_path = tmpd_cwd / "workers_ready"
    finish_path = tmpd_cwd / "stage1_workers_may_continue"
    ready_path.touch()
    inputs = [File(finish_path)]
    outputs = [File(ready_path)]

    futs = [waiting_app(i, outputs=outputs, inputs=inputs) for i in range(ntasks)]

    try_assert(lambda: ready_path.read_text().count("\n") == _max_blocks, "Wait for _max_blocks tasks to be running", timeout_ms=15000)

    # This should be true immediately, because the previous try_assert should
    # wait until there are max_blocks tasks running, and his test should be
    # configured to use 1 worker per block.
    assert len(dfk.executors['htex_local'].connected_managers()) == _max_blocks

    finish_path.touch()  # Approximation of Event, via files
    [x.result() for x in futs]

    assert dfk.executors['htex_local'].outstanding == 0

    # now we can launch one "long" task -
    # and what should happen is that the connected_managers count "eventually" (?) converges to 1 and stays there.

    finish_path = tmpd_cwd / "stage2_workers_may_continue"

    fut = waiting_app(0, outputs=outputs, inputs=[File(finish_path)])

    def check_one_block():
        return len(dfk.executors['htex_local'].connected_managers()) == 1

    try_assert(
        check_one_block,
        fail_msg="Expected 1 managers during a single long task",
    )

    # the task should not have finished by the time we end up with 1 manager
    assert not fut.done()

    # This section wait for the strategy to run again, with the above single
    # task outstanding, and check that the strategy has not scaled up or
    # down more on those subsequent iterations.

    # It does this by hooking the callback of the job status poller, and
    # waiting until it has run.

    old_cb = dfk.job_status_poller.callback

    strategy_iterated = Event()

    def hook_cb(*args, **kwargs):
        r = old_cb(*args, **kwargs)
        strategy_iterated.set()
        return r

    dfk.job_status_poller.callback = hook_cb

    # hack strategies to run more frequently. this allo
    # dfk.job_status_poller.

    try_assert(
        strategy_iterated.is_set,
        fail_msg="Expected strategy to have run within this period",
    )

    assert check_one_block()

    finish_path.touch()  # now we can end the single stage-2 task

    fut.result()

    # now we should expect min_blocks scale down

    def check_min_blocks():
        return len(dfk.executors['htex_local'].connected_managers()) == _min_blocks

    try_assert(
        check_min_blocks,
        fail_msg=f"Expected {_min_blocks} managers when no tasks (min_blocks)",
    )