1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
/* Copyright (c) 2012-2025. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "chainsend.h"
XBT_LOG_NEW_DEFAULT_CATEGORY(broadcaster, "Messages specific for the broadcaster");
static chain_message_t chain_message_new(sg_mailbox_t prev, sg_mailbox_t next, const unsigned int num_pieces)
{
chain_message_t msg = xbt_malloc(sizeof(s_chain_message_t));
msg->prev_ = prev;
msg->next_ = next;
msg->num_pieces = num_pieces;
return msg;
}
static void broadcaster_build_chain(broadcaster_t bc)
{
/* Build the chain if there's at least one peer */
if (bc->host_count > 0)
bc->first = bc->mailboxes[0];
for (unsigned i = 0; i < bc->host_count; i++) {
sg_mailbox_t prev = i > 0 ? bc->mailboxes[i - 1] : NULL;
sg_mailbox_t next = i < bc->host_count - 1 ? bc->mailboxes[i + 1] : NULL;
XBT_DEBUG("Building chain--broadcaster:\"%s\" dest:\"%s\" prev:\"%s\" next:\"%s\"", sg_host_self_get_name(),
sg_mailbox_get_name(bc->mailboxes[i]), prev ? sg_mailbox_get_name(prev) : NULL,
next ? sg_mailbox_get_name(next) : NULL);
/* Send message to current peer */
sg_mailbox_put(bc->mailboxes[i], chain_message_new(prev, next, bc->piece_count), MESSAGE_BUILD_CHAIN_SIZE);
}
}
static void broadcaster_send_file(const_broadcaster_t bc)
{
for (unsigned int current_piece = 0; current_piece < bc->piece_count; current_piece++) {
XBT_DEBUG("Sending (send) piece %u from %s into mailbox %s", current_piece, sg_host_self_get_name(),
sg_mailbox_get_name(bc->first));
char* file_piece = bprintf("piece-%u", current_piece);
sg_comm_t comm = sg_mailbox_put_async(bc->first, file_piece, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
sg_activity_set_push(bc->pending_sends, (sg_activity_t)comm);
}
sg_activity_set_wait_all(bc->pending_sends);
}
static broadcaster_t broadcaster_init(sg_mailbox_t* mailboxes, unsigned int host_count, unsigned int piece_count)
{
broadcaster_t bc = xbt_malloc(sizeof(s_broadcaster_t));
bc->first = NULL;
bc->host_count = host_count;
bc->piece_count = piece_count;
bc->mailboxes = mailboxes;
bc->pending_sends = sg_activity_set_init();
broadcaster_build_chain(bc);
return bc;
}
static void broadcaster_destroy(broadcaster_t bc)
{
sg_activity_set_delete(bc->pending_sends);
xbt_free(bc->mailboxes);
xbt_free(bc);
}
/** Emitter function */
void broadcaster(int argc, char* argv[])
{
XBT_DEBUG("broadcaster");
xbt_assert(argc > 2);
unsigned int host_count = (unsigned int)xbt_str_parse_int(argv[1], "Invalid number of peers");
sg_mailbox_t* mailboxes = xbt_malloc(sizeof(sg_mailbox_t) * host_count);
for (unsigned int i = 1; i <= host_count; i++) {
char* name = bprintf("node-%u.simgrid.org", i);
XBT_DEBUG("%s", name);
mailboxes[i - 1] = sg_mailbox_by_name(name);
free(name);
}
unsigned int piece_count = (unsigned int)xbt_str_parse_int(argv[2], "Invalid number of pieces");
broadcaster_t bc = broadcaster_init(mailboxes, host_count, piece_count);
broadcaster_send_file(bc);
broadcaster_destroy(bc);
}
|