File: broadcaster.c

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (94 lines) | stat: -rw-r--r-- 3,279 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/* Copyright (c) 2012-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "chainsend.h"

XBT_LOG_NEW_DEFAULT_CATEGORY(broadcaster, "Messages specific for the broadcaster");

static chain_message_t chain_message_new(sg_mailbox_t prev, sg_mailbox_t next, const unsigned int num_pieces)
{
  chain_message_t msg = xbt_malloc(sizeof(s_chain_message_t));
  msg->prev_          = prev;
  msg->next_          = next;
  msg->num_pieces     = num_pieces;

  return msg;
}

static void broadcaster_build_chain(broadcaster_t bc)
{
  /* Build the chain if there's at least one peer */
  if (bc->host_count > 0)
    bc->first = bc->mailboxes[0];

  for (unsigned i = 0; i < bc->host_count; i++) {
    sg_mailbox_t prev = i > 0 ? bc->mailboxes[i - 1] : NULL;
    sg_mailbox_t next = i < bc->host_count - 1 ? bc->mailboxes[i + 1] : NULL;
    XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", sg_host_self_get_name(),
              sg_mailbox_get_name(bc->mailboxes[i]), prev ? sg_mailbox_get_name(prev) : NULL,
              next ? sg_mailbox_get_name(next) : NULL);
    /* Send message to current peer */
    sg_mailbox_put(bc->mailboxes[i], chain_message_new(prev, next, bc->piece_count), MESSAGE_BUILD_CHAIN_SIZE);
  }
}

static void broadcaster_send_file(const_broadcaster_t bc)
{
  for (unsigned int current_piece = 0; current_piece < bc->piece_count; current_piece++) {
    XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg_host_self_get_name(),
              sg_mailbox_get_name(bc->first));
    char* file_piece = bprintf("piece-%u", current_piece);
    sg_comm_t comm   = sg_mailbox_put_async(bc->first, file_piece, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
    sg_activity_set_push(bc->pending_sends, (sg_activity_t)comm);
  }
  sg_activity_set_wait_all(bc->pending_sends);
}

static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host_count, unsigned int piece_count)
{
  broadcaster_t bc = xbt_malloc(sizeof(s_broadcaster_t));

  bc->first         = NULL;
  bc->host_count    = host_count;
  bc->piece_count   = piece_count;
  bc->mailboxes     = mailboxes;
  bc->pending_sends = sg_activity_set_init();

  broadcaster_build_chain(bc);

  return bc;
}

static void broadcaster_destroy(broadcaster_t bc)
{
  sg_activity_set_delete(bc->pending_sends);
  xbt_free(bc->mailboxes);
  xbt_free(bc);
}

/** Emitter function  */
void broadcaster(int argc, char* argv[])
{
  XBT_DEBUG("broadcaster");
  xbt_assert(argc > 2);
  unsigned int host_count = (unsigned int)xbt_str_parse_int(argv[1], "Invalid number of peers");

  sg_mailbox_t* mailboxes = xbt_malloc(sizeof(sg_mailbox_t) * host_count);

  for (unsigned int i = 1; i <= host_count; i++) {
    char* name = bprintf("node-%u.simgrid.org", i);
    XBT_DEBUG("%s", name);
    mailboxes[i - 1] = sg_mailbox_by_name(name);
    free(name);
  }

  unsigned int piece_count = (unsigned int)xbt_str_parse_int(argv[2], "Invalid number of pieces");

  broadcaster_t bc = broadcaster_init(mailboxes, host_count, piece_count);

  broadcaster_send_file(bc);

  broadcaster_destroy(bc);
}