1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
# Copyright (c) 2018-2025. The SimGrid Team. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the license (GNU LGPL) which comes with this package.
# This script does exactly the same thing as file s4u-exec-ptask.cpp
import sys
from simgrid import Actor, Engine, Host, this_actor, TimeoutException
def runner():
hosts = Engine.instance.all_hosts
hosts_count = len(hosts)
# Test 1
this_actor.info("First, build a classical parallel activity, with 1 Gflop to execute on each node, "
"and 10MB to exchange between each pair")
computation_amounts = [1e9]*hosts_count
communication_amounts = [0]*hosts_count*hosts_count
for i in range(hosts_count):
for j in range(i+1, hosts_count):
communication_amounts[i * hosts_count + j] = 1e7
this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
# Test 2
this_actor.info("We can do the same with a timeout of 10 seconds enabled.")
activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
try:
activity.wait_for(10.0)
sys.exit("Woops, this did not timeout as expected... Please report that bug.")
except TimeoutException:
this_actor.info("Caught the expected timeout exception.")
activity.cancel()
# Test 3
this_actor.info("Then, build a parallel activity involving only computations (of different amounts) and no communication")
computation_amounts = [3e8, 6e8, 1e9]
communication_amounts = []
this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
# Test 4
this_actor.info("Then, build a parallel activity with no computation nor communication (synchro only)")
computation_amounts = []
this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)
# Test 5
this_actor.info("Then, Monitor the execution of a parallel activity")
computation_amounts = [1e6]*hosts_count
communication_amounts = [0, 1e6, 0, 0, 0, 1e6, 1e6, 0, 0]
activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
activity.start()
while not activity.test():
ratio = activity.remaining_ratio * 100
this_actor.info(f"Remaining flop ratio: {ratio:.0f}%")
this_actor.sleep_for(5)
activity.wait()
# Test 6
this_actor.info("Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).")
this_actor.info(" - Start a regular parallel execution, with both comm and computation")
computation_amounts = [1e6]*hosts_count
communication_amounts = [0, 1e6, 0, 0, 1e6, 0, 1e6, 0, 0]
activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
activity.start()
this_actor.sleep_for(10)
remaining_ratio = activity.remaining_ratio
this_actor.info(f" - After 10 seconds, {remaining_ratio*100:.2f}% remains to be done. Change it from 3 hosts to 2 hosts only.")
this_actor.info(" Let's first suspend the task.")
activity.suspend()
this_actor.info(" - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).")
rescheduling_comp = [0, 0, 0]
rescheduling_comm = [0, 0, 0, 0, 0, 0, 25000, 25000, 0]
this_actor.parallel_execute(hosts, rescheduling_comp, rescheduling_comm)
this_actor.info(" - Now, let's cancel the old task and create a new task with modified comm and computation vectors:")
this_actor.info(" What was already done is removed, and the load of the removed host is shared between remaining ones.")
for i in range(2):
# remove what we've done so far, for both comm and compute load
computation_amounts[i] *= remaining_ratio
communication_amounts[i] *= remaining_ratio
# The work from 1 must be shared between 2 remaining ones. 1/2=50% of extra work for each
computation_amounts[i] *= 1.5
communication_amounts[i] *= 1.5
hosts = hosts[:2]
computation_amounts = computation_amounts[:2]
remaining_comm = communication_amounts[1]
communication_amounts = [0, remaining_comm, remaining_comm, 0]
activity.cancel()
activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
this_actor.info(" - Done, let's wait for the task completion")
activity.wait()
this_actor.info("Goodbye now!")
if __name__ == "__main__":
if len(sys.argv) != 2:
sys.exit(f"Syntax: {sys.argv[0]} <platform_file>")
platform = sys.argv[1]
engine = Engine.instance
Engine.set_config("host/model:ptask_L07") # /!\ this is required for running ptasks
engine.load_platform(platform)
engine.host_by_name("MyHost1").add_actor("foo", runner)
engine.run()
|