1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
|
import logging
import os
import random
from typing import Dict
import pytest
import parsl
from parsl import Config, bash_app, python_app
from parsl.executors import MPIExecutor
from parsl.executors.errors import InvalidResourceSpecification
from parsl.launchers import SimpleLauncher
from parsl.providers import LocalProvider
EXECUTOR_LABEL = "MPI_TEST"
def local_setup():
cwd = os.path.abspath(os.path.dirname(__file__))
pbs_nodefile = os.path.join(cwd, "mocks", "pbs_nodefile")
config = Config(
executors=[
MPIExecutor(
label=EXECUTOR_LABEL,
max_workers_per_block=2,
mpi_launcher="mpiexec",
provider=LocalProvider(
worker_init=f"export PBS_NODEFILE={pbs_nodefile}",
launcher=SimpleLauncher()
)
)
])
parsl.load(config)
def local_teardown():
parsl.dfk().cleanup()
@python_app
def get_env_vars(parsl_resource_specification: Dict = {}) -> Dict:
import os
parsl_vars = {}
for key in os.environ:
if key.startswith("PARSL_"):
parsl_vars[key] = os.environ[key]
return parsl_vars
@pytest.mark.local
def test_only_resource_specs_set():
"""Confirm that resource_spec env vars are set while launch prefixes are not
when enable_mpi_mode = False"""
resource_spec = {
"num_nodes": 2,
"ranks_per_node": 2,
}
future = get_env_vars(parsl_resource_specification=resource_spec)
result = future.result()
assert isinstance(result, Dict)
logging.warning(f"Got table: {result}")
assert "PARSL_MPI_PREFIX" in result
assert "PARSL_MPIEXEC_PREFIX" in result
assert result["PARSL_MPI_PREFIX"] == result["PARSL_MPIEXEC_PREFIX"]
assert result["PARSL_NUM_NODES"] == str(resource_spec["num_nodes"])
assert result["PARSL_RANKS_PER_NODE"] == str(resource_spec["ranks_per_node"])
assert result["PARSL_NUM_RANKS"] == str(resource_spec["ranks_per_node"] * resource_spec["num_nodes"])
@bash_app
def echo_launch_cmd(
parsl_resource_specification: Dict,
stdout=parsl.AUTO_LOGNAME,
stderr=parsl.AUTO_LOGNAME,
):
return 'echo "$PARSL_MPI_PREFIX hostname"'
@pytest.mark.local
def test_bash_default_prefix_set():
"""Confirm that resource_spec env vars are set while launch prefixes are not
when enable_mpi_mode = False"""
resource_spec = {
"num_nodes": 2,
"ranks_per_node": 2,
}
future = echo_launch_cmd(parsl_resource_specification=resource_spec)
result = future.result()
assert result == 0
with open(future.stdout) as f:
output = f.readlines()
assert output[0].startswith("mpiexec")
logging.warning(f"output : {output}")
@pytest.mark.local
def test_bash_multiple_set():
"""Confirm that multiple apps can run without blocking each other out
when enable_mpi_mode = False"""
resource_spec = {
"num_nodes": 2,
"num_ranks": 4,
}
futures = []
for i in range(4):
resource_spec["num_nodes"] = i + 1
future = echo_launch_cmd(parsl_resource_specification=resource_spec)
futures.append(future)
for future in futures:
result = future.result()
assert result == 0
with open(future.stdout) as f:
output = f.readlines()
assert output[0].startswith("mpiexec")
@bash_app
def bash_resource_spec(parsl_resource_specification=None, stdout=parsl.AUTO_LOGNAME):
total_ranks = (
parsl_resource_specification["ranks_per_node"] * parsl_resource_specification["num_nodes"]
)
return f'echo "{total_ranks}"'
@pytest.mark.local
def test_bash_app_using_resource_spec():
resource_spec = {
"num_nodes": 2,
"ranks_per_node": 2,
}
future = bash_resource_spec(parsl_resource_specification=resource_spec)
assert future.result() == 0
with open(future.stdout) as f:
output = f.readlines()
total_ranks = resource_spec["num_nodes"] * resource_spec["ranks_per_node"]
assert int(output[0].strip()) == total_ranks
@python_app
def mock_app(sleep_dur: float = 0.0, parsl_resource_specification: Dict = {}):
import os
import time
time.sleep(sleep_dur)
total_ranks = int(os.environ["PARSL_NUM_NODES"]) * int(os.environ["PARSL_RANKS_PER_NODE"])
nodes = os.environ["PARSL_MPI_NODELIST"].split(',')
return total_ranks, nodes
@pytest.mark.local
def test_simulated_load(rounds: int = 100):
node_choices = (1, 2, 4)
sleep_choices = (0, 0.01, 0.02, 0.04)
ranks_per_node = (4, 8)
futures = {}
for i in range(rounds):
resource_spec = {
"num_nodes": random.choice(node_choices),
"ranks_per_node": random.choice(ranks_per_node),
}
future = mock_app(sleep_dur=random.choice(sleep_choices),
parsl_resource_specification=resource_spec)
futures[future] = resource_spec
for future in futures:
total_ranks, nodes = future.result(timeout=10)
assert len(nodes) == futures[future]["num_nodes"]
assert total_ranks == futures[future]["num_nodes"] * futures[future]["ranks_per_node"]
@pytest.mark.local
def test_missing_resource_spec():
with pytest.raises(InvalidResourceSpecification):
future = mock_app(sleep_dur=0.4)
future.result(timeout=10)
|