File: exec-ptask.py

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (103 lines) | stat: -rw-r--r-- 4,847 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# Copyright (c) 2018-2025. The SimGrid Team. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the license (GNU LGPL) which comes with this package.


# This script does exactly the same thing as file s4u-exec-ptask.cpp

import sys
from simgrid import Actor, Engine, Host, this_actor, TimeoutException

def runner():
    hosts = Engine.instance.all_hosts
    hosts_count = len(hosts)

    # Test 1
    this_actor.info("First, build a classical parallel activity, with 1 Gflop to execute on each node, "
               "and 10MB to exchange between each pair")
    computation_amounts = [1e9]*hosts_count
    communication_amounts = [0]*hosts_count*hosts_count
    for i in range(hosts_count):
        for j in range(i+1, hosts_count):
            communication_amounts[i * hosts_count + j] = 1e7
    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)

    # Test 2
    this_actor.info("We can do the same with a timeout of 10 seconds enabled.")
    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
    try:
        activity.wait_for(10.0)
        sys.exit("Woops, this did not timeout as expected... Please report that bug.")
    except TimeoutException:
        this_actor.info("Caught the expected timeout exception.")
        activity.cancel()

    # Test 3
    this_actor.info("Then, build a parallel activity involving only computations (of different amounts) and no communication")
    computation_amounts = [3e8, 6e8, 1e9]
    communication_amounts = []
    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)

    # Test 4
    this_actor.info("Then, build a parallel activity with no computation nor communication (synchro only)")
    computation_amounts = []
    this_actor.parallel_execute(hosts, computation_amounts, communication_amounts)

    # Test 5
    this_actor.info("Then, Monitor the execution of a parallel activity")
    computation_amounts = [1e6]*hosts_count
    communication_amounts = [0, 1e6, 0, 0, 0, 1e6, 1e6, 0, 0]
    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
    activity.start()
    while not activity.test():
        ratio = activity.remaining_ratio * 100
        this_actor.info(f"Remaining flop ratio: {ratio:.0f}%")
        this_actor.sleep_for(5)
    activity.wait()

    # Test 6
    this_actor.info("Finally, simulate a malleable task (a parallel execution that gets reconfigured after its start).")
    this_actor.info("  - Start a regular parallel execution, with both comm and computation")
    computation_amounts = [1e6]*hosts_count
    communication_amounts = [0, 1e6, 0, 0, 1e6, 0, 1e6, 0, 0]
    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
    activity.start()
    this_actor.sleep_for(10)
    remaining_ratio = activity.remaining_ratio
    this_actor.info(f"  - After 10 seconds, {remaining_ratio*100:.2f}% remains to be done. Change it from 3 hosts to 2 hosts only.")
    this_actor.info("    Let's first suspend the task.")
    activity.suspend()
    this_actor.info("  - Now, simulate the reconfiguration (modeled as a comm from the removed host to the remaining ones).")
    rescheduling_comp = [0, 0, 0]
    rescheduling_comm = [0, 0, 0, 0, 0, 0, 25000, 25000, 0]
    this_actor.parallel_execute(hosts, rescheduling_comp, rescheduling_comm)
    this_actor.info("  - Now, let's cancel the old task and create a new task with modified comm and computation vectors:")
    this_actor.info("    What was already done is removed, and the load of the removed host is shared between remaining ones.")
    for i in range(2):
        # remove what we've done so far, for both comm and compute load
        computation_amounts[i] *= remaining_ratio
        communication_amounts[i] *= remaining_ratio
        # The work from 1 must be shared between 2 remaining ones. 1/2=50% of extra work for each
        computation_amounts[i] *= 1.5
        communication_amounts[i] *= 1.5
    hosts = hosts[:2]
    computation_amounts = computation_amounts[:2]
    remaining_comm = communication_amounts[1]
    communication_amounts = [0, remaining_comm, remaining_comm, 0]
    activity.cancel()
    activity = this_actor.exec_init(hosts, computation_amounts, communication_amounts)
    this_actor.info("  - Done, let's wait for the task completion")
    activity.wait()
    this_actor.info("Goodbye now!")


if __name__ == "__main__":
    if len(sys.argv) != 2:
        sys.exit(f"Syntax: {sys.argv[0]} <platform_file>")
    platform = sys.argv[1]
    engine = Engine.instance
    Engine.set_config("host/model:ptask_L07")  # /!\ this is required for running ptasks
    engine.load_platform(platform)
    engine.host_by_name("MyHost1").add_actor("foo", runner)
    engine.run()