File: app-masterworker.c

package info (click to toggle)
simgrid 4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 38,980 kB
  • sloc: cpp: 123,583; ansic: 66,779; python: 8,358; java: 6,406; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,337; perl: 1,436; makefile: 105; lisp: 49; javascript: 7; sed: 6
file content (102 lines) | stat: -rw-r--r-- 4,045 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/* Copyright (c) 2010-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/actor.h"
#include "simgrid/engine.h"
#include "simgrid/forward.h"
#include "simgrid/mailbox.h"
#include "xbt/log.h"
#include "xbt/str.h"
#include "xbt/sysdep.h"

#define FINALIZE 221297 /* a magic number to tell people to stop working */

#include <stdio.h> /* snprintf */

XBT_LOG_NEW_DEFAULT_CATEGORY(app_masterworker, "Messages specific for this example");

/* Main function of the master actor */
static void master(int argc, char* argv[])
{
  xbt_assert(argc == 5, "The master function expects 4 arguments from the XML deployment file");
  long number_of_tasks = xbt_str_parse_int(argv[1], "Invalid amount of tasks");       /* - Number of tasks      */
  double comp_size     = xbt_str_parse_double(argv[2], "Invalid computational size"); /* - Compute cost    */
  long comm_size       = xbt_str_parse_int(argv[3], "Invalid communication size");    /* - Communication size */
  long workers_count   = xbt_str_parse_int(argv[4], "Invalid amount of workers");     /* - Number of workers    */

  XBT_INFO("Got %ld workers and %ld tasks to process", workers_count, number_of_tasks);

  for (int i = 0; i < number_of_tasks; i++) { /* For each task to be executed: */
    char mailbox_name[80];
    char task_name[80];
    double* payload = xbt_malloc(sizeof(double));
    snprintf(mailbox_name, 79, "worker-%ld", i % workers_count); /* - Select a @ref worker in a round-robin way */
    snprintf(task_name, 79, "Task_%d", i);

    sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);
    *payload             = comp_size;

    if (number_of_tasks < 10000 || i % 10000 == 0)
      XBT_INFO("Sending \"%s\" (of %ld) to mailbox \"%s\"", task_name, number_of_tasks, mailbox_name);

    sg_mailbox_put(mailbox, payload, comm_size); /* - Send the amount of flops to compute to the @ref worker */
  }

  XBT_INFO("All tasks have been dispatched. Let's tell everybody the computation is over.");
  for (int i = 0; i < workers_count; i++) { /* - Eventually tell all the workers to stop by sending a "finalize" task */
    char mailbox_name[80];
    snprintf(mailbox_name, 79, "worker-%ld", i % workers_count);
    double* payload      = xbt_malloc(sizeof(double));
    sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);
    *payload             = FINALIZE;
    sg_mailbox_put(mailbox, payload, 0);
  }
}

/* Main functions of the Worker actors */
static void worker(int argc, char* argv[])
{
  xbt_assert(argc == 2,
             "The worker expects a single argument from the XML deployment file: its worker ID (its numerical rank)");
  char mailbox_name[80];

  long id = xbt_str_parse_int(argv[1], "Invalid argument");

  snprintf(mailbox_name, 79, "worker-%ld", id);
  sg_mailbox_t mailbox = sg_mailbox_by_name(mailbox_name);

  while (1) { /* The worker wait in an infinite loop for tasks sent by the @ref master */
    double* payload = (double*)sg_mailbox_get(mailbox);

    if (*payload == FINALIZE) {
      xbt_free(payload); /* - Exit if 'finalize' is received */
      break;
    }
    sg_actor_execute(*payload); /*  - Otherwise, process the received number of flops*/
    xbt_free(payload);
  }
  XBT_INFO("I'm done. See you!");
}

int main(int argc, char* argv[])
{
  simgrid_init(&argc, argv);
  xbt_assert(argc > 2,
             "Usage: %s platform_file deployment_file\n"
             "\tExample: %s platform.xml deployment.xml\n",
             argv[0], argv[0]);

  simgrid_load_platform(argv[1]); /* - Load the platform description */

  simgrid_register_function("master", master); /* - Register the function to be executed by the actors */
  simgrid_register_function("worker", worker);
  simgrid_load_deployment(argv[2]); /* - Deploy the application */

  simgrid_run(); /* - Run the simulation */

  XBT_INFO("Simulation time %g", simgrid_get_clock());

  return 0;
}