File: test_connected_blocks.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (62 lines) | stat: -rw-r--r-- 1,640 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import pytest

import parsl
from parsl import Config
from parsl.executors import HighThroughputExecutor
from parsl.providers import LocalProvider


def local_config():
    return Config(
        executors=[
            HighThroughputExecutor(
                label="HTEX",
                heartbeat_period=1,
                heartbeat_threshold=2,
                poll_period=100,
                address="127.0.0.1",
                max_workers_per_node=1,
                provider=LocalProvider(
                    init_blocks=0,
                    max_blocks=2,
                    min_blocks=0,
                ),
            )
        ],
        max_idletime=0.5,
        strategy='htex_auto_scale',
    )


@parsl.python_app
def double(x):
    return x * 2


@pytest.mark.local
def test_get_connected_blocks():
    """Test reporting of connected blocks from HTEX"""
    dfk = parsl.dfk()
    executor = dfk.executors["HTEX"]

    connected_blocks = executor.connected_blocks()
    assert not connected_blocks, "Expected 0 blocks"

    blocking_task = double(5).result()
    assert blocking_task == 10

    connected_blocks = executor.connected_blocks()
    assert len(connected_blocks) == 1, "Expected 1 block"

    executor.scale_in(1)

    connected_blocks = executor.connected_blocks()
    assert len(connected_blocks) == 1, "Expected 1 block"

    blocking_task = double(5).result()
    assert blocking_task == 10

    # With the first block scaled_in there should be reporting
    # 2 blocks including the older one.
    connected_blocks = executor.connected_blocks()
    assert len(connected_blocks) == 2, "Expected 2 blocks"