File: cloud-masterworker.c

package info (click to toggle)
simgrid 3.21%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 21,972 kB
  • sloc: cpp: 88,193; ansic: 69,244; fortran: 6,089; f90: 5,162; xml: 4,861; java: 4,250; perl: 2,056; python: 1,193; sh: 1,159; makefile: 57; sed: 6
file content (213 lines) | stat: -rw-r--r-- 6,300 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
/* Copyright (c) 2007-2018. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "simgrid/msg.h"
#include "simgrid/plugins/live_migration.h"

#include <stdio.h> /* snprintf */

XBT_LOG_NEW_DEFAULT_CATEGORY(msg_test, "Messages specific for this msg example");

#define MAXMBOXLEN 64

/** @addtogroup MSG_examples
 *
 *  - <b>cloud/master_worker_vm.c: Master/workers
 *    example on a cloud</b>. The classical example revisited to demonstrate the use of virtual machines.
 */

const double task_comp_size = 10000000;
const double task_comm_size = 10000000;

static void send_tasks(int nb_workers)
{
  for (int i = 0; i < nb_workers; i++) {
    char *tname = bprintf("Task%02d", i);
    char *mbox  = bprintf("MBOX:WRK%02d", i);

    msg_task_t task = MSG_task_create(tname, task_comp_size, task_comm_size, NULL);

    XBT_INFO("Send task(%s) to mailbox(%s)", tname, mbox);
    MSG_task_send(task, mbox);

    free(tname);
    free(mbox);
  }
}

static int worker_fun(int argc, char *argv[])
{
  const char *pr_name = MSG_process_get_name(MSG_process_self());
  char mbox[MAXMBOXLEN];
  snprintf(mbox, MAXMBOXLEN, "MBOX:%s", pr_name);

  XBT_INFO("%s is listening on mailbox(%s)", pr_name, mbox);

  for (;;) {
    msg_task_t task = NULL;

    msg_error_t res = MSG_task_receive(&task, mbox);
    xbt_assert(res == MSG_OK, "MSG_task_get failed");

    XBT_INFO("%s received task(%s) from mailbox(%s)", pr_name, MSG_task_get_name(task), mbox);

    if (strcmp(MSG_task_get_name(task), "finalize") == 0) {
      MSG_task_destroy(task);
      break;
    }

    MSG_task_execute(task);
    XBT_INFO("%s executed task(%s)", pr_name, MSG_task_get_name(task));
    MSG_task_destroy(task);
  }
  return 0;
}

static int master_fun(int argc, char *argv[])
{
  msg_vm_t vm;
  unsigned int i;

  xbt_dynar_t worker_pms = MSG_process_get_data(MSG_process_self());
  int nb_workers = xbt_dynar_length(worker_pms);

  xbt_dynar_t vms = xbt_dynar_new(sizeof(msg_vm_t), NULL);

  /* Launch VMs and worker processes. One VM per PM, and one worker process per VM. */
  XBT_INFO("# Launch %d VMs", nb_workers);
  for (int i = 0; i< nb_workers; i++) {
    char *vm_name = bprintf("VM%02d", i);
    char *pr_name = bprintf("WRK%02d", i);

    msg_host_t pm = xbt_dynar_get_as(worker_pms, i, msg_host_t);

    XBT_INFO("create %s on PM(%s)", vm_name, MSG_host_get_name(pm));
    msg_vm_t vm = MSG_vm_create_core(pm, vm_name);

    MSG_vm_set_ramsize(vm, 1L * 1024 * 1024 * 1024); // 1GiB

    MSG_vm_start(vm);
    xbt_dynar_push(vms, &vm);

    XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
    MSG_process_create(pr_name, worker_fun, NULL, (msg_host_t)vm);

    xbt_free(vm_name);
    xbt_free(pr_name);
  }

  /* Send a bunch of work to every one */
  XBT_INFO("# Send a task to %d worker process", nb_workers);
  send_tasks(nb_workers);

  XBT_INFO("# Suspend all VMs");
  xbt_dynar_foreach(vms, i, vm) {
    XBT_INFO("suspend %s", MSG_vm_get_name(vm));
    MSG_vm_suspend(vm);
  }

  XBT_INFO("# Wait a while");
  MSG_process_sleep(2);

  XBT_INFO("# Resume all VMs");
  xbt_dynar_foreach(vms, i, vm) {
    MSG_vm_resume(vm);
  }

  XBT_INFO("# Sleep long enough for everyone to be done with previous batch of work");
  MSG_process_sleep(10 - MSG_get_clock());

  XBT_INFO("# Add one more process on each VM");
  xbt_dynar_foreach(vms, i, vm) {
    unsigned int index = i + xbt_dynar_length(vms);
    char* vm_name      = bprintf("VM%02u", i);
    char* pr_name      = bprintf("WRK%02u", index);

    XBT_INFO("put a process (%s) on %s", pr_name, vm_name);
    MSG_process_create(pr_name, worker_fun, NULL, (msg_host_t)vm);

    xbt_free(vm_name);
    xbt_free(pr_name);
  }

  XBT_INFO("# Send a task to %d worker process", nb_workers * 2);
  send_tasks(nb_workers * 2);

  msg_host_t worker_pm0 = xbt_dynar_get_as(worker_pms, 0, msg_host_t);
  msg_host_t worker_pm1 = xbt_dynar_get_as(worker_pms, 1, msg_host_t);

  XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm0));
  xbt_dynar_foreach(vms, i, vm) {
    MSG_vm_migrate(vm, worker_pm0);
  }

  XBT_INFO("# Migrate all VMs to PM(%s)", MSG_host_get_name(worker_pm1));
  xbt_dynar_foreach(vms, i, vm) {
    MSG_vm_migrate(vm, worker_pm1);
  }

  XBT_INFO("# Shutdown the half of worker processes gracefully. The remaining half will be forcibly killed.");
  for (i = 0; i < nb_workers; i++) {
    char mbox[MAXMBOXLEN];
    snprintf(mbox, MAXMBOXLEN, "MBOX:WRK%02u", i);
    msg_task_t finalize = MSG_task_create("finalize", 0, 0, 0);
    MSG_task_send(finalize, mbox);
  }

  XBT_INFO("# Wait a while before effective shutdown.");
  MSG_process_sleep(2);

  XBT_INFO("# Shutdown and destroy all the VMs. The remaining worker processes will be forcibly killed.");
  xbt_dynar_foreach(vms, i, vm) {
    XBT_INFO("shutdown %s", MSG_vm_get_name(vm));
    MSG_vm_shutdown(vm);
    XBT_INFO("destroy %s", MSG_vm_get_name(vm));
    MSG_vm_destroy(vm);
  }

  XBT_INFO("# Goodbye now!");
  xbt_dynar_free(&vms);
  return 0;
}

/** Receiver function  */
int main(int argc, char *argv[])
{
  const int nb_workers = 2;

  MSG_init(&argc, argv);
  MSG_vm_live_migration_plugin_init();

  xbt_assert(argc >1,"Usage: %s example/platforms/cluster_backbone.xml\n", argv[0]);

  /* Load the platform file */
  MSG_create_environment(argv[1]);

  /* Retrieve hosts from the platform file */
  xbt_dynar_t pms = MSG_hosts_as_dynar();

  /* we need a master node and worker nodes */
  xbt_assert(xbt_dynar_length(pms) > nb_workers,"need %d hosts", nb_workers + 1);

  /* the first pm is the master, the others are workers */
  msg_host_t master_pm = xbt_dynar_get_as(pms, 0, msg_host_t);

  xbt_dynar_t worker_pms = xbt_dynar_new(sizeof(msg_host_t), NULL);
  for (int i = 1; i < nb_workers + 1; i++) {
    msg_host_t pm = xbt_dynar_get_as(pms, i, msg_host_t);
    xbt_dynar_push(worker_pms, &pm);
  }
  xbt_dynar_free(&pms);

  /* Start the master process on the master pm. */
  MSG_process_create("master", master_fun, worker_pms, master_pm);

  msg_error_t res = MSG_main();
  XBT_INFO("Bye (simulation time %g)", MSG_get_clock());

  xbt_dynar_free(&worker_pms);

  return !(res == MSG_OK);
}