1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
# Copyright (c) 2010-2025. The SimGrid Team. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the license (GNU LGPL) which comes with this package.
from argparse import ArgumentParser
from typing import List
import sys
from simgrid import Actor, ActivitySet, Comm, Engine, Mailbox, this_actor
FINALIZE_MESSAGE = "finalize"
def create_parser() -> ArgumentParser:
parser = ArgumentParser()
parser.add_argument(
'--platform',
type=str,
required=True,
help='path to the platform description'
)
return parser
def get_peer_mailbox(peer_id: int) -> Mailbox:
return Mailbox.by_name(f"peer-{peer_id}")
def peer(my_id: int, message_count: int, payload_size: int, peers_count: int):
my_mailbox: Mailbox = get_peer_mailbox(my_id)
my_mailbox.set_receiver(Actor.self())
pending_comms = ActivitySet()
# Start dispatching all messages to peers others that myself
for i in range(message_count):
for peer_id in range(peers_count):
if peer_id != my_id:
peer_mailbox = get_peer_mailbox(peer_id)
message = f"Message {i} from peer {my_id}"
this_actor.info(f"Send '{message}' to '{peer_mailbox.name}'")
pending_comms.push(peer_mailbox.put_async(message, payload_size))
# Start sending messages to let peers know that they should stop
for peer_id in range(peers_count):
if peer_id != my_id:
peer_mailbox = get_peer_mailbox(peer_id)
payload = str(FINALIZE_MESSAGE)
pending_comms.push(peer_mailbox.put_async(payload, payload_size))
this_actor.info(f"Send '{payload}' to '{peer_mailbox.name}'")
this_actor.info("Done dispatching all messages")
# Retrieve all the messages other peers have been sending to me until I receive all the corresponding "Finalize"
# messages
pending_finalize_messages = peers_count - 1
while pending_finalize_messages > 0:
if my_mailbox.ready:
start = Engine.clock
received: str = my_mailbox.get()
waiting_time = Engine.clock - start
if waiting_time > 0.0:
raise AssertionError(f"Expecting the waiting time to be 0.0 because the communication was supposedly "
f"ready, but got {waiting_time} instead")
this_actor.info(f"I got a '{received}'.")
if received == FINALIZE_MESSAGE:
pending_finalize_messages -= 1
else:
this_actor.info("Nothing ready to consume yet, I better sleep for a while")
this_actor.sleep_for(0.01)
this_actor.info("I'm done, just waiting for my peers to receive the messages before exiting")
pending_comms.wait_all()
this_actor.info("Goodbye now!")
def main():
settings = create_parser().parse_known_args()[0]
e = Engine(sys.argv)
e.load_platform(settings.platform)
e.add_actor("peer", e.host_by_name("Tremblay"), peer, 0, 2, int(5e7), 3)
e.add_actor("peer", e.host_by_name("Ruby"), peer, 1, 6, int(2.5e5), 3)
e.add_actor("peer", e.host_by_name("Perl"), peer, 2, 0, int(5e7), 3)
e.run()
if __name__ == "__main__":
main()
|