1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
/* Copyright (c) 2007-2025. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "simgrid/actor.h"
#include "simgrid/comm.h"
#include "simgrid/engine.h"
#include "simgrid/exec.h"
#include "simgrid/host.h"
#include "simgrid/mailbox.h"
#include "xbt/log.h"
#include "xbt/str.h"
#include "xbt/sysdep.h"
#include <stdio.h> /* snprintf */
#define FINALIZE 221297 /* a magic number to tell people to stop working */
XBT_LOG_NEW_DEFAULT_CATEGORY(platform_failures, "Messages specific for this example");
static void master(int argc, char* argv[])
{
xbt_assert(argc == 5);
long number_of_tasks = xbt_str_parse_int(argv[1], "Invalid amount of tasks");
double task_comp_size = xbt_str_parse_double(argv[2], "Invalid computational size");
long task_comm_size = xbt_str_parse_int(argv[3], "Invalid communication size");
long workers_count = xbt_str_parse_int(argv[4], "Invalid amount of workers");
XBT_INFO("Got %ld workers and %ld tasks to process", workers_count, number_of_tasks);
for (int i = 0; i < number_of_tasks; i++) {
char mailbox_name[256];
snprintf(mailbox_name, 255, "worker-%ld", i % workers_count);
sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);
XBT_INFO("Send a message to %s", mailbox_name);
double* payload = (double*)xbt_malloc(sizeof(double));
*payload = task_comp_size;
sg_comm_t comm = sg_mailbox_put_async(mailbox, payload, task_comm_size);
switch (sg_comm_wait_for(comm, 10.0)) {
case SG_OK:
XBT_INFO("Send to %s completed", mailbox_name);
break;
case SG_ERROR_NETWORK:
XBT_INFO("Mmh. The communication with '%s' failed. Nevermind. Let's keep going!", mailbox_name);
xbt_free(payload);
break;
case SG_ERROR_TIMEOUT:
XBT_INFO("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", mailbox_name);
xbt_free(payload);
break;
default:
xbt_die("Unexpected behavior");
}
}
XBT_INFO("All tasks have been dispatched. Let's tell everybody the computation is over.");
for (int i = 0; i < workers_count; i++) {
char mailbox_name[256];
snprintf(mailbox_name, 255, "worker-%ld", i % workers_count);
sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);
double* payload = (double*)xbt_malloc(sizeof(double));
*payload = FINALIZE;
sg_comm_t comm = sg_mailbox_put_async(mailbox, payload, 0);
switch (sg_comm_wait_for(comm, 1.0)) {
case SG_ERROR_NETWORK:
XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!", mailbox_name);
xbt_free(payload);
break;
case SG_ERROR_TIMEOUT:
XBT_INFO("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", mailbox_name);
xbt_free(payload);
break;
case SG_OK:
/* nothing */
break;
default:
xbt_die("Unexpected behavior with '%s'", mailbox_name);
}
}
XBT_INFO("Goodbye now!");
}
static void worker(int argc, char* argv[])
{
xbt_assert(argc == 2);
char mailbox_name[80];
long id = xbt_str_parse_int(argv[1], "Invalid argument");
snprintf(mailbox_name, 79, "worker-%ld", id);
sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);
while (1) {
XBT_INFO("Waiting a message on %s", mailbox_name);
double* payload;
sg_comm_t comm = sg_mailbox_get_async(mailbox, (void**)&payload);
sg_error_t retcode = sg_comm_wait(comm);
if (retcode == SG_OK) {
if (*payload == FINALIZE) {
xbt_free(payload);
break;
} else {
double comp_size = *payload;
xbt_free(payload);
XBT_INFO("Start execution...");
sg_actor_execute(comp_size);
XBT_INFO("Execution complete.");
}
} else if (retcode == SG_ERROR_NETWORK) {
XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
}
}
XBT_INFO("I'm done. See you!");
}
int main(int argc, char* argv[])
{
simgrid_init(&argc, argv);
xbt_assert(argc > 2,
"Usage: %s platform_file deployment_file\n"
"\tExample: %s platform.xml deployment.xml\n",
argv[0], argv[0]);
simgrid_load_platform(argv[1]);
simgrid_register_function("master", master);
simgrid_register_function("worker", worker);
simgrid_load_deployment(argv[2]);
simgrid_run();
XBT_INFO("Simulation time %g", simgrid_get_clock());
return 0;
}
|