File: runmultinode.py

package info (click to toggle)
libfabric 2.1.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,108 kB
  • sloc: ansic: 387,262; python: 3,171; sh: 2,555; makefile: 1,313; cpp: 617; perl: 474; ruby: 123; asm: 27
file content (209 lines) | stat: -rw-r--r-- 6,229 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/env python3

import argparse, builtins, os, sys, yaml, socket

def parse_args():
	parser = argparse.ArgumentParser(description="libfabric multinode test with slurm")
	parser.add_argument('--dry-run', action='store_true', help='Perform a dry run without making any changes.')
	parser.add_argument("--ci", type=str, help="Commands to prepend to test call. Only used with the internal launcher option", default="")
	parser.add_argument("-x", "--capability", type=str, help="libfabric capability", default="msg")
	parser.add_argument("-i", "--iterations", type=int , help="Number of iterations", default=1)
	parser.add_argument("-l", "--launcher", type=str, choices=['internal', 'srun', 'mpirun'], help="launcher to use for running job. If nothing is specified, test manages processes internally. Available options: internal, srun and mpirun", default="internal")

	required = parser.add_argument_group("Required arguments")
	required.add_argument("-p", "--provider", type=str, help="libfabric provider")
	required.add_argument("-np", "--num-procs", type=int, help="Map process by node, l3cache, etc")
	required.add_argument("-c", "--config", type=str, help="Test configuration")

	srun_required = parser.add_argument_group("Required if using srun")
	srun_required.add_argument("-t", "--procs-per-node", type=int,
							help="Number of procs per node", default=-1)

	args = parser.parse_args()
	return args, parser

def parse_config(config):
	with open(config, "r") as f:
		yconf = yaml.load(f, Loader=yaml.FullLoader)
	return yconf

def mpi_env(config):
	env = config['environment']
	result = []
	for k in env.keys():
		result.append(f"-x {k}")
	return " ".join(result)

def set_env(config):
	env = config['environment']
	for k, v in env.items():
		os.environ[k] = str(v)

def mpi_mca_params(config):
	try:
		mca = config['mca']
		result = []
		for k, v in env.items():
			result.append(f"--mca {k} {v}")
		return " ".join(result)
	except:
		return ""

def mpi_bind_to(config):
	try:
		return f"--bind-to {config['bind-to']}"
	except:
		return "--bind-to core"

def mpi_map_by(config):
	try:
		return f"--map-by ppr:{config['map-by-count']}:{config['map-by']}"
	except:
		return "--map-by ppr:1:l3"

def execute_cmd(cmd, dry_run):
	script_dir = os.path.dirname(os.path.abspath(__file__))
	sys.path.append(script_dir)
	from command import Command
	cmd = Command(cmd, fake=dry_run)
	rc, out = cmd.exec_cmd()
	return rc, out

def split_on_commas(expr): 
	l = []
	c = o = 0 
	s = expr
	stop = False
	while not stop and c != -1 and o != -1:
		o = s.find('[')
		b = s.find(']')
		c = s.find(',')
		while c > o and c < b:
			c = s.find(',', c+1)
		if len(l):
			l.pop()
		if c < o or c > b:
			l += [s[:s.find(',', c)], s[s.find(',', c)+1:]]
			s = l[-1]
		else:
			l += s.split(',')
			stop = True
	for i in range(0, len(l)):
		l[i] = l[i].strip()
	return l

def expand_host_list_sub(expr):
	host_list = []

	# first phase split on the commas first
	open_br = expr.find('[')
	close_br = expr.find(']', open_br)
	if open_br == -1 and close_br == -1:
		return expr.split(',')

	if open_br == -1 or close_br == -1:
		return []

	rangestr = expr[open_br+1 : close_br]

	node = expr[:open_br]

	ranges = rangestr.split(',')

	for r in ranges:
		cur = r.split('-')
		if len(cur) == 2:
			pre = "{:0%dd}" % len(cur[0])
			for idx in range(int(cur[0]), int(cur[1])+1):
				host_list.append(f'{node}{pre.format(idx)}')
		elif len(cur) == 1:
			pre = "{:0%dd}" % len(cur[0])
			host_list.append(f'{node}{pre.format(int(cur[0]))}')

	return host_list

def expand_host_list(expr):
	l = split_on_commas(expr)
	host_list = []
	for e in l:
		host_list += expand_host_list_sub(e)
	return host_list

supported_pm = ['pmi', 'pmi2', 'pmix']

def is_srun_pm_supported():
	rc, out = execute_cmd('srun --mpi=list', False)
	if rc:
		return False
	input_list = out.split('\n')
	cleaned_list = [entry.strip() for entry in input_list if entry.strip()]
	for e in supported_pm:
		if e in cleaned_list[1:]:
			return True
	return False

if __name__ == '__main__':

	# list of providers which do not address specification
	no_addr_prov = ['cxi']

	args, parser = parse_args()

	if not args.config:
		print("**A configuration file is required")
		print(parser.format_help())
		exit(-1)

	mnode = parse_config(args.config)['multinode']
	set_env(mnode)

	if args.launcher == 'srun':
		if not is_srun_pm_supported():
			print(f"**Supported process managers are: {','.join(supported_pm)}")
			print(parser.format_help())
			exit(-1)

	# The script assumes it's already running in a SLURM allocation. It can
	# then srun fi_multinode
	#
	if "pattern" not in mnode:
		print("Test pattern must be defined in the YAML configuration file")
		exit()

	if args.provider in no_addr_prov:
		cmd = f"fi_multinode -n {args.num_procs} -s {socket.gethostname()} " \
			f"-p {args.provider} -x {args.capability} -z {mnode['pattern']} " \
			f"-I {args.iterations} -u {args.launcher.lower()} -T"
	else:
		cmd = f"fi_multinode -n {args.num_procs} -s {socket.gethostname()} " \
			f"-p {args.provider} -x {args.capability} -z '{mnode['pattern']}' " \
			f"-I {args.iterations} -u {args.launcher.lower()} -T"

	if args.launcher.lower() == 'mpirun':
		mpi = f"mpirun {mpi_env(mnode)} {mpi_mca_params(mnode)} {mpi_bind_to(mnode)} "\
			  f"{mpi_map_by(mnode)} -np {args.num_procs} {cmd}"
	elif args.launcher.lower() == 'srun':
		if args.procs_per_node == -1 or args.num_procs == -1:
			print("**Need to specify --procs-per-node and --num-procs")
			print(parser.format_help())
			exit()
		mpi = f"srun --ntasks-per-node {args.procs_per_node} --ntasks {args.num_procs} "\
			  f"{cmd}"
	elif args.launcher.lower() == 'internal':
		if args.procs_per_node == -1:
			print("**Need to specify --procs-per-node")
			print(parser.format_help())
			exit()
		hl = ",".join(expand_host_list(os.environ['SLURM_NODELIST']))
		mpi = f"runmultinode.sh -h {hl} -n {args.procs_per_node} -p {args.provider} " \
			  f"-x {args.capability} -I {args.iterations} -z {mnode['pattern']}"
		if args.ci:
			mpi += f" --ci '{args.ci}'"
	else:
		print("**Unsupported launcher")
		print(parser.format_help())
		exit()

	rc, out = execute_cmd(mpi, args.dry_run)

	print(f"Command completed with {rc}\n{out}")