File: test_mpi_mode_enabled.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (188 lines) | stat: -rw-r--r-- 5,423 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import logging
import os
import random
from typing import Dict

import pytest

import parsl
from parsl import Config, bash_app, python_app
from parsl.executors import MPIExecutor
from parsl.executors.errors import InvalidResourceSpecification
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider

EXECUTOR_LABEL = "MPI_TEST"


def local_setup():

    cwd = os.path.abspath(os.path.dirname(__file__))
    pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile")

    config = Config(
        executors=[
            MPIExecutor(
                label=EXECUTOR_LABEL,
                max_workers_per_block=2,
                mpi_launcher="mpiexec",
                provider=LocalProvider(
                    worker_init=f"export PBS_NODEFILE={pbs_nodefile}",
                    launcher=SimpleLauncher()
                )
            )
        ])

    parsl.load(config)


def local_teardown():
    parsl.dfk().cleanup()


@python_app
def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict:
    import os

    parsl_vars = {}
    for key in os.environ:
        if key.startswith("PARSL_"):
            parsl_vars[key] = os.environ[key]
    return parsl_vars


@pytest.mark.local
def test_only_resource_specs_set():
    """Confirm that resource_spec env vars are set while launch prefixes are not
    when enable_mpi_mode = False"""
    resource_spec = {
        "num_nodes": 2,
        "ranks_per_node": 2,
    }

    future = get_env_vars(parsl_resource_specification=resource_spec)

    result = future.result()
    assert isinstance(result, Dict)
    logging.warning(f"Got table: {result}")
    assert "PARSL_MPI_PREFIX" in result
    assert "PARSL_MPIEXEC_PREFIX" in result
    assert result["PARSL_MPI_PREFIX"] == result["PARSL_MPIEXEC_PREFIX"]
    assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"])
    assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"])
    assert result["PARSL_NUM_RANKS"] == str(resource_spec["ranks_per_node"] * resource_spec["num_nodes"])


@bash_app
def echo_launch_cmd(
    parsl_resource_specification: Dict,
    stdout=parsl.AUTO_LOGNAME,
    stderr=parsl.AUTO_LOGNAME,
):
    return 'echo "$PARSL_MPI_PREFIX hostname"'


@pytest.mark.local
def test_bash_default_prefix_set():
    """Confirm that resource_spec env vars are set while launch prefixes are not
    when enable_mpi_mode = False"""
    resource_spec = {
        "num_nodes": 2,
        "ranks_per_node": 2,
    }

    future = echo_launch_cmd(parsl_resource_specification=resource_spec)

    result = future.result()
    assert result == 0
    with open(future.stdout) as f:
        output = f.readlines()
        assert output[0].startswith("mpiexec")
        logging.warning(f"output : {output}")


@pytest.mark.local
def test_bash_multiple_set():
    """Confirm that multiple apps can run without blocking each other out
    when enable_mpi_mode = False"""
    resource_spec = {
        "num_nodes": 2,
        "num_ranks": 4,
    }
    futures = []
    for i in range(4):
        resource_spec["num_nodes"] = i + 1
        future = echo_launch_cmd(parsl_resource_specification=resource_spec)
        futures.append(future)

    for future in futures:
        result = future.result()
        assert result == 0
        with open(future.stdout) as f:
            output = f.readlines()
            assert output[0].startswith("mpiexec")


@bash_app
def bash_resource_spec(parsl_resource_specification=None, stdout=parsl.AUTO_LOGNAME):
    total_ranks = (
        parsl_resource_specification["ranks_per_node"] * parsl_resource_specification["num_nodes"]
    )
    return f'echo "{total_ranks}"'


@pytest.mark.local
def test_bash_app_using_resource_spec():
    resource_spec = {
        "num_nodes": 2,
        "ranks_per_node": 2,
    }
    future = bash_resource_spec(parsl_resource_specification=resource_spec)
    assert future.result() == 0
    with open(future.stdout) as f:
        output = f.readlines()
        total_ranks = resource_spec["num_nodes"] * resource_spec["ranks_per_node"]
        assert int(output[0].strip()) == total_ranks


@python_app
def mock_app(sleep_dur: float = 0.0, parsl_resource_specification: Dict = {}):
    import os
    import time
    time.sleep(sleep_dur)

    total_ranks = int(os.environ["PARSL_NUM_NODES"]) * int(os.environ["PARSL_RANKS_PER_NODE"])
    nodes = os.environ["PARSL_MPI_NODELIST"].split(',')

    return total_ranks, nodes


@pytest.mark.local
def test_simulated_load(rounds: int = 100):

    node_choices = (1, 2, 4)
    sleep_choices = (0, 0.01, 0.02, 0.04)
    ranks_per_node = (4, 8)

    futures = {}
    for i in range(rounds):
        resource_spec = {
            "num_nodes": random.choice(node_choices),
            "ranks_per_node": random.choice(ranks_per_node),
        }
        future = mock_app(sleep_dur=random.choice(sleep_choices),
                          parsl_resource_specification=resource_spec)
        futures[future] = resource_spec

    for future in futures:
        total_ranks, nodes = future.result(timeout=10)
        assert len(nodes) == futures[future]["num_nodes"]
        assert total_ranks == futures[future]["num_nodes"] * futures[future]["ranks_per_node"]


@pytest.mark.local
def test_missing_resource_spec():

    with pytest.raises(InvalidResourceSpecification):
        future = mock_app(sleep_dur=0.4)
        future.result(timeout=10)