File: platform-failures.py

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (114 lines) | stat: -rw-r--r-- 4,814 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
## Copyright (c) 2007-2025. The SimGrid Team. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the license (GNU LGPL) which comes with this package.

"""
This example shows how to work with the state profile of a host or a link,
specifying when the resource must be turned on or off.

To set such a profile, the first way is to use a file in the XML, while the second is to use the programmatic
interface, as exemplified in the main() below. Once this profile is in place, the resource will automatically
be turned on and off.

The actors running on a host that is turned off are forcefully killed
once their on_exit callbacks are executed. They cannot avoid this fate.
Since we specified on_failure="RESTART" for each actors in the XML file,
they will be automatically restarted when the host starts again.

Communications using failed links will .. fail.
"""

import sys
from simgrid import Actor, Engine, Host, Mailbox, this_actor, NetworkFailureException, TimeoutException

def master(* args):
    assert len(args) == 4, f"Actor master requires 4 parameters, but got {len(args)} ones."
    tasks_count = int(args[0])
    comp_size = int(args[1])
    comm_size = int(args[2])
    workers_count = int(args[3])

    this_actor.info(f"Got {workers_count} workers and {tasks_count} tasks to process")

    for i in range(tasks_count): # For each task to be executed:
        # - Select a worker in a round-robin way
        mailbox = Mailbox.by_name(f"worker-{i % workers_count}")
        try:
            this_actor.info(f"Send a message to {mailbox.name}")
            mailbox.put(comp_size, comm_size, 10.0)
            this_actor.info(f"Send to {mailbox.name} completed")
        except TimeoutException:
            this_actor.info(f"Mmh. Got timeouted while speaking to '{mailbox.name}'. Nevermind. Let's keep going!")
        except NetworkFailureException:
            this_actor.info(f"Mmh. The communication with '{mailbox.name}' failed. Nevermind. Let's keep going!")

    this_actor.info("All tasks have been dispatched. Let's tell everybody the computation is over.")
    for i in range(workers_count):
        # - Eventually tell all the workers to stop by sending a "finalize" task
        mailbox = Mailbox.by_name(f"worker-{i % workers_count}")
        try:
            mailbox.put(-1.0, 0, 1.0)
        except TimeoutException:
            this_actor.info(f"Mmh. Got timeouted while speaking to '{mailbox.name}'. Nevermind. Let's keep going!")
        except NetworkFailureException:
            this_actor.info(f"Mmh. The communication with '{mailbox.name}' failed. Nevermind. Let's keep going!")

    this_actor.info("Goodbye now!")

def worker(* args):
    assert len(args) == 1, "Expecting one parameter"
    my_id = int(args[0])

    mailbox = Mailbox.by_name(f"worker-{my_id}")
    done = False
    while not done:
        try:
            this_actor.info(f"Waiting a message on {mailbox.name}")
            compute_cost = mailbox.get()
            if compute_cost > 0: # If compute_cost is valid, execute a computation of that cost
                this_actor.info("Start execution...")
                this_actor.execute(compute_cost)
                this_actor.info("Execution complete.")
            else: # Stop when receiving an invalid compute_cost
                this_actor.info("I'm done. See you!")
                done = True
        except NetworkFailureException:
            this_actor.info("Mmh. Something went wrong. Nevermind. Let's keep going!")

def sleeper():
    this_actor.info("Start sleeping...")
    this_actor.sleep_for(1)
    this_actor.info("done sleeping.")

if __name__ == '__main__':
    assert len(sys.argv) > 2, "Usage: python app-masterworkers.py platform_file deployment_file"

    e = Engine(sys.argv)

    # This is how to attach a profile to an host that is created from the XML file.
    # This should be done before calling load_platform(), as the on_creation() event is fired when loading the platform.
    # You can never set a new profile to a resource that already have one.
    def on_creation(host):
        if host.name == "Bourrassa":
            host.set_state_profile("67 0\n70 1\n", 0)
    Host.on_creation_cb(on_creation)

    e.load_platform(sys.argv[1])

    e.register_actor("master", master)
    e.register_actor("worker", worker)
    e.load_deployment(sys.argv[2])

    # Add a new host programatically, and attach a state profile to it
    lili = e.netzone_root.add_host("Lilibeth", 1e15)
    lili.set_state_profile("4 0\n5 1\n", 10)
    lili.seal()

    # Create an actor on that new host, to monitor its own state
    actor = lili.add_actor("sleeper", sleeper)
    actor.set_auto_restart(True)

    e.run()

    this_actor.info(f"Simulation time {e.clock:.4f}")