File: mpi_prefix_composer.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (136 lines) | stat: -rw-r--r-- 4,308 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
from typing import Dict, List, Tuple

from parsl.executors.errors import InvalidResourceSpecification

logger = logging.getLogger(__name__)

VALID_LAUNCHERS = ('srun',
                   'aprun',
                   'mpiexec')


def validate_resource_spec(resource_spec: Dict[str, str]):
    """Basic validation of keys in the resource_spec

    Raises: InvalidResourceSpecification if the resource_spec
        is invalid (e.g, contains invalid keys)
    """
    user_keys = set(resource_spec.keys())

    # empty resource_spec when mpi_mode is set causes parsl to hang
    # ref issue #3427
    if len(user_keys) == 0:
        raise InvalidResourceSpecification(user_keys,
                                           'MPI mode requires optional parsl_resource_specification keyword argument to be configured')

    legal_keys = set(("ranks_per_node",
                      "num_nodes",
                      "num_ranks",
                      "launcher_options",
                      ))
    invalid_keys = user_keys - legal_keys
    if invalid_keys:
        raise InvalidResourceSpecification(invalid_keys)
    if "num_nodes" in resource_spec:
        if not resource_spec.get("num_ranks") and resource_spec.get("ranks_per_node"):
            resource_spec["num_ranks"] = str(int(resource_spec["num_nodes"]) * int(resource_spec["ranks_per_node"]))
        if not resource_spec.get("ranks_per_node") and resource_spec.get("num_ranks"):
            resource_spec["ranks_per_node"] = str(int(resource_spec["num_ranks"]) / int(resource_spec["num_nodes"]))
    return


def compose_mpiexec_launch_cmd(
    resource_spec: Dict, node_hostnames: List[str]
) -> Tuple[str, str]:
    """Compose mpiexec launch command prefix"""

    node_str = ",".join(node_hostnames)
    args = [
        "mpiexec",
        "-n",
        resource_spec.get("num_ranks"),
        "-ppn",
        resource_spec.get("ranks_per_node"),
        "-hosts",
        node_str,
        resource_spec.get("launcher_options", ""),
    ]
    prefix = " ".join(str(arg) for arg in args)
    return "PARSL_MPIEXEC_PREFIX", prefix


def compose_srun_launch_cmd(
    resource_spec: Dict, node_hostnames: List[str]
) -> Tuple[str, str]:
    """Compose srun launch command prefix"""

    num_nodes = str(len(node_hostnames))
    args = [
        "srun",
        "--ntasks",
        resource_spec.get("num_ranks"),
        "--ntasks-per-node",
        resource_spec.get("ranks_per_node"),
        "--nodelist",
        ",".join(node_hostnames),
        "--nodes",
        num_nodes,
        resource_spec.get("launcher_options", ""),
    ]

    prefix = " ".join(str(arg) for arg in args)
    return "PARSL_SRUN_PREFIX", prefix


def compose_aprun_launch_cmd(
    resource_spec: Dict, node_hostnames: List[str]
) -> Tuple[str, str]:
    """Compose aprun launch command prefix"""

    node_str = ",".join(node_hostnames)
    args = [
        "aprun",
        "-n",
        resource_spec.get("num_ranks"),
        "-N",
        resource_spec.get("ranks_per_node"),
        "-node-list",
        node_str,
        resource_spec.get("launcher_options", ""),
    ]
    prefix = " ".join(str(arg) for arg in args)
    return "PARSL_APRUN_PREFIX", prefix


def compose_all(
    mpi_launcher: str, resource_spec: Dict, node_hostnames: List[str]
) -> Dict[str, str]:
    """Compose all launch command prefixes and set the default"""

    all_prefixes = {}
    composers = [
        compose_aprun_launch_cmd,
        compose_srun_launch_cmd,
        compose_mpiexec_launch_cmd,
    ]
    for composer in composers:
        try:
            key, prefix = composer(resource_spec, node_hostnames)
            all_prefixes[key] = prefix
        except Exception:
            logging.exception(
                f"Failed to compose launch prefix with {composer} from {resource_spec}"
            )
            pass

    if mpi_launcher == "srun":
        all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_SRUN_PREFIX"]
    elif mpi_launcher == "aprun":
        all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_APRUN_PREFIX"]
    elif mpi_launcher == "mpiexec":
        all_prefixes["PARSL_MPI_PREFIX"] = all_prefixes["PARSL_MPIEXEC_PREFIX"]
    else:
        raise RuntimeError(f"Unknown mpi_launcher:{mpi_launcher}")

    return all_prefixes