File: platform-failures.c

package info (click to toggle)
simgrid 4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 38,980 kB
  • sloc: cpp: 123,583; ansic: 66,779; python: 8,358; java: 6,406; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,337; perl: 1,436; makefile: 105; lisp: 49; javascript: 7; sed: 6
file content (140 lines) | stat: -rw-r--r-- 4,603 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/* Copyright (c) 2007-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/actor.h"
#include "simgrid/comm.h"
#include "simgrid/engine.h"
#include "simgrid/exec.h"
#include "simgrid/host.h"
#include "simgrid/mailbox.h"

#include "xbt/log.h"
#include "xbt/str.h"
#include "xbt/sysdep.h"

#include <stdio.h> /* snprintf */

#define FINALIZE 221297 /* a magic number to tell people to stop working */

XBT_LOG_NEW_DEFAULT_CATEGORY(platform_failures, "Messages specific for this example");

static void master(int argc, char* argv[])
{
  xbt_assert(argc == 5);
  long number_of_tasks  = xbt_str_parse_int(argv[1], "Invalid amount of tasks");
  double task_comp_size = xbt_str_parse_double(argv[2], "Invalid computational size");
  long task_comm_size   = xbt_str_parse_int(argv[3], "Invalid communication size");
  long workers_count    = xbt_str_parse_int(argv[4], "Invalid amount of workers");

  XBT_INFO("Got %ld workers and %ld tasks to process", workers_count, number_of_tasks);

  for (int i = 0; i < number_of_tasks; i++) {
    char mailbox_name[256];
    snprintf(mailbox_name, 255, "worker-%ld", i % workers_count);
    sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);

    XBT_INFO("Send a message to %s", mailbox_name);
    double* payload = (double*)xbt_malloc(sizeof(double));
    *payload        = task_comp_size;
    sg_comm_t comm  = sg_mailbox_put_async(mailbox, payload, task_comm_size);

    switch (sg_comm_wait_for(comm, 10.0)) {
      case SG_OK:
        XBT_INFO("Send to %s completed", mailbox_name);
        break;
      case SG_ERROR_NETWORK:
        XBT_INFO("Mmh. The communication with '%s' failed. Nevermind. Let's keep going!", mailbox_name);
        xbt_free(payload);
        break;
      case SG_ERROR_TIMEOUT:
        XBT_INFO("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", mailbox_name);
        xbt_free(payload);
        break;
      default:
        xbt_die("Unexpected behavior");
    }
  }

  XBT_INFO("All tasks have been dispatched. Let's tell everybody the computation is over.");
  for (int i = 0; i < workers_count; i++) {
    char mailbox_name[256];
    snprintf(mailbox_name, 255, "worker-%ld", i % workers_count);
    sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);
    double* payload      = (double*)xbt_malloc(sizeof(double));
    *payload             = FINALIZE;
    sg_comm_t comm       = sg_mailbox_put_async(mailbox, payload, 0);

    switch (sg_comm_wait_for(comm, 1.0)) {
      case SG_ERROR_NETWORK:
        XBT_INFO("Mmh. Can't reach '%s'! Nevermind. Let's keep going!", mailbox_name);
        xbt_free(payload);
        break;
      case SG_ERROR_TIMEOUT:
        XBT_INFO("Mmh. Got timeouted while speaking to '%s'. Nevermind. Let's keep going!", mailbox_name);
        xbt_free(payload);
        break;
      case SG_OK:
        /* nothing */
        break;
      default:
        xbt_die("Unexpected behavior with '%s'", mailbox_name);
    }
  }

  XBT_INFO("Goodbye now!");
}

static void worker(int argc, char* argv[])
{
  xbt_assert(argc == 2);
  char mailbox_name[80];
  long id = xbt_str_parse_int(argv[1], "Invalid argument");

  snprintf(mailbox_name, 79, "worker-%ld", id);
  sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);

  while (1) {
    XBT_INFO("Waiting a message on %s", mailbox_name);
    double* payload;
    sg_comm_t comm     = sg_mailbox_get_async(mailbox, (void**)&payload);
    sg_error_t retcode = sg_comm_wait(comm);
    if (retcode == SG_OK) {
      if (*payload == FINALIZE) {
        xbt_free(payload);
        break;
      } else {
        double comp_size = *payload;
        xbt_free(payload);
        XBT_INFO("Start execution...");
        sg_actor_execute(comp_size);
        XBT_INFO("Execution complete.");
      }
    } else if (retcode == SG_ERROR_NETWORK) {
      XBT_INFO("Mmh. Something went wrong. Nevermind. Let's keep going!");
    }
  }
  XBT_INFO("I'm done. See you!");
}

int main(int argc, char* argv[])
{
  simgrid_init(&argc, argv);
  xbt_assert(argc > 2,
             "Usage: %s platform_file deployment_file\n"
             "\tExample: %s platform.xml deployment.xml\n",
             argv[0], argv[0]);

  simgrid_load_platform(argv[1]);

  simgrid_register_function("master", master);
  simgrid_register_function("worker", worker);
  simgrid_load_deployment(argv[2]);

  simgrid_run();

  XBT_INFO("Simulation time %g", simgrid_get_clock());

  return 0;
}