File: app-masterworker.c

package info (click to toggle)
simgrid 3.21%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 21,972 kB
  • sloc: cpp: 88,193; ansic: 69,244; fortran: 6,089; f90: 5,162; xml: 4,861; java: 4,250; perl: 2,056; python: 1,193; sh: 1,159; makefile: 57; sed: 6
file content (91 lines) | stat: -rw-r--r-- 3,697 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/* Copyright (c) 2010-2018. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/msg.h"

#include <stdio.h> /* snprintf */

XBT_LOG_NEW_DEFAULT_CATEGORY(msg_app_masterworker, "Messages specific for this msg example");

/* Main function of the master process */
static int master(int argc, char *argv[])
{
  xbt_assert(argc==5, "The master function expects 4 arguments from the XML deployment file");
  long number_of_tasks = xbt_str_parse_int(argv[1], "Invalid amount of tasks: %s");    /* - Number of tasks      */
  double comp_size = xbt_str_parse_double(argv[2], "Invalid computational size: %s");  /* - Task compute cost    */
  double comm_size = xbt_str_parse_double(argv[3], "Invalid communication size: %s");  /* - Task communication size */
  long workers_count = xbt_str_parse_int(argv[4], "Invalid amount of workers: %s");    /* - Number of workers    */

  XBT_INFO("Got %ld workers and %ld tasks to process", workers_count, number_of_tasks);

  for (int i = 0; i < number_of_tasks; i++) {  /* For each task to be executed: */
    char mailbox[80];
    char task_name[80];

    snprintf(mailbox,79, "worker-%ld", i % workers_count); /* - Select a @ref worker in a round-robin way */
    snprintf(task_name,79, "Task_%d", i);
    msg_task_t task = MSG_task_create(task_name, comp_size, comm_size, NULL);   /* - Create a task */
    if (number_of_tasks < 10000 || i % 10000 == 0)
      XBT_INFO("Sending \"%s\" (of %ld) to mailbox \"%s\"", task->name, number_of_tasks, mailbox);

    MSG_task_send(task, mailbox); /* - Send the task to the @ref worker */
  }

  XBT_INFO("All tasks have been dispatched. Let's tell everybody the computation is over.");
  for (int i = 0; i < workers_count; i++) { /* - Eventually tell all the workers to stop by sending a "finalize" task */
    char mailbox[80];

    snprintf(mailbox,79, "worker-%ld", i % workers_count);
    msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
    MSG_task_send(finalize, mailbox);
  }

  return 0;
}

/* Main functions of the Worker processes */
static int worker(int argc, char *argv[])
{
  xbt_assert(argc==2, "The worker expects a single argument from the XML deployment file: its worker ID (its numerical rank)");
  char mailbox[80];

  long id= xbt_str_parse_int(argv[1], "Invalid argument %s");

  snprintf(mailbox,79, "worker-%ld", id);

  while (1) { /* The worker wait in an infinite loop for tasks sent by the @ref master */
    msg_task_t task = NULL;
    int res = MSG_task_receive(&task, mailbox);
    xbt_assert(res == MSG_OK, "MSG_task_get failed");

    if (strcmp(MSG_task_get_name(task), "finalize") == 0) {
      MSG_task_destroy(task);  /* - Exit if 'finalize' is received */
      break;
    }
    MSG_task_execute(task);    /*  - Otherwise, process the task */
    MSG_task_destroy(task);
  }
  XBT_INFO("I'm done. See you!");
  return 0;
}

int main(int argc, char *argv[])
{
  MSG_init(&argc, argv);
  xbt_assert(argc > 2, "Usage: %s platform_file deployment_file\n"
             "\tExample: %s msg_platform.xml msg_deployment.xml\n", argv[0], argv[0]);

  MSG_create_environment(argv[1]);          /* - Load the platform description */

  MSG_function_register("master", master);  /* - Register the function to be executed by the processes */
  MSG_function_register("worker", worker);
  MSG_launch_application(argv[2]);          /* - Deploy the application */

  msg_error_t res = MSG_main();             /* - Run the simulation */

  XBT_INFO("Simulation time %g", MSG_get_clock());

  return res != MSG_OK;
}