1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
import argparse
import concurrent.futures
import importlib
import time
from typing import Any, Dict, Literal
import parsl
from parsl.dataflow.dflow import DataFlowKernel
from parsl.errors import InternalConsistencyError
VALID_NAMED_ITERATION_MODES = ("estimate", "exponential")
min_iterations = 2
# TODO: factor with conftest.py where this is copy/pasted from?
def load_dfk_from_config(filename: str) -> DataFlowKernel:
spec = importlib.util.spec_from_file_location('', filename)
if spec is None:
raise RuntimeError("Could not import configuration")
module = importlib.util.module_from_spec(spec)
if spec.loader is None:
raise RuntimeError("Could not load configuration")
spec.loader.exec_module(module)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
if hasattr(module, 'config'):
return parsl.load(module.config)
elif hasattr(module, 'fresh_config'):
return parsl.load(module.fresh_config())
else:
raise RuntimeError("Config module does not define config or fresh_config")
@parsl.python_app
def app(extra_payload: Any, parsl_resource_specification: Dict = {}) -> int:
return 7
def performance(*, resources: dict, target_t: float, args_extra_size: int, iterate_mode: str | list[int]) -> None:
delta_t: float
iteration = 1
args_extra_payload = "x" * args_extra_size
if isinstance(iterate_mode, list):
n = iterate_mode[0]
else:
n = 10
iterate = True
while iterate:
print(f"==== Iteration {iteration} ====")
print(f"Will run {n} tasks")
start_t = time.time()
fs = []
print("Submitting tasks / invoking apps")
for _ in range(n):
fs.append(app(args_extra_payload, parsl_resource_specification=resources))
submitted_t = time.time()
print(f"All {n} tasks submitted ... waiting for completion")
print(f"Submission took {submitted_t - start_t:.3f} seconds = {n / (submitted_t - start_t):.3f} tasks/second")
for f in concurrent.futures.as_completed(fs):
assert f.result() == 7
end_t = time.time()
delta_t = end_t - start_t
rate = n / delta_t
print(f"Runtime: actual {delta_t:.3f}s vs target {target_t}s")
print(f"Tasks per second: {rate:.3f}")
iteration += 1
# decide upon next iteration
match iterate_mode:
case "estimate":
n = max(1, int(target_t * rate))
iterate = delta_t < (0.75 * target_t) or iteration <= min_iterations
case "exponential":
n = int(n * 2)
iterate = delta_t < target_t or iteration <= min_iterations
case seq if isinstance(seq, list) and iteration <= len(seq):
n = seq[iteration - 1]
iterate = True
case seq if isinstance(seq, list):
iterate = False
case _:
raise InternalConsistencyError(f"Bad iterate mode {iterate_mode} - should have been validated at arg parse time")
def validate_int_list(v: str) -> list[int] | Literal[False]:
try:
return list(map(int, v.split(",")))
except ValueError:
return False
def iteration_mode(v: str) -> str | list[int]:
match v:
case s if s in VALID_NAMED_ITERATION_MODES:
return s
case _ if seq := validate_int_list(v):
return seq
case _:
raise argparse.ArgumentTypeError(f"Invalid iteration mode: {v}")
def cli_run() -> None:
parser = argparse.ArgumentParser(
prog="parsl-perf",
description="Measure performance of Parsl configurations",
epilog="""
Example usage: python -m parsl.benchmark.perf --config parsl/tests/configs/workqueue_blocks.py --resources '{"cores":1, "memory":0, "disk":0}'
""")
parser.add_argument("--config", required=True, help="path to Python file that defines a configuration")
parser.add_argument("--resources", metavar="EXPR", help="parsl_resource_specification dictionary")
parser.add_argument("--time", metavar="SECONDS", help="target number of seconds for an iteration", default=120, type=float)
parser.add_argument("--argsize", metavar="BYTES", help="extra bytes to add into app invocation arguments", default=0, type=int)
parser.add_argument("--version", action="version", version=f"parsl-perf from Parsl {parsl.__version__}")
parser.add_argument("--iterate",
metavar="MODE",
help="Iteration mode: " + ", ".join(VALID_NAMED_ITERATION_MODES) + ", or sequence of explicit sizes",
type=iteration_mode,
default="estimate")
args = parser.parse_args()
if args.resources:
resources = eval(args.resources)
else:
resources = {}
with load_dfk_from_config(args.config):
performance(resources=resources, target_t=args.time, args_extra_size=args.argsize, iterate_mode=args.iterate)
print("Tests complete - leaving DFK block")
print("The end")
if __name__ == "__main__":
cli_run()
|