File: peer.c

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (90 lines) | stat: -rw-r--r-- 2,867 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/* Copyright (c) 2012-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include "chainsend.h"

XBT_LOG_NEW_DEFAULT_CATEGORY(chainsend_peer, "Messages specific for the peer");

static void peer_join_chain(peer_t p)
{
  chain_message_t msg = (chain_message_t)sg_mailbox_get(p->me);
  p->prev             = msg->prev_;
  p->next             = msg->next_;
  p->total_pieces     = msg->num_pieces;
  XBT_DEBUG("Peer %s got a 'BUILD_CHAIN' message (prev: %s / next: %s)", sg_mailbox_get_name(p->me),
            p->prev ? sg_mailbox_get_name(p->prev) : NULL, p->next ? sg_mailbox_get_name(p->next) : NULL);
  xbt_free(msg);
}

static void peer_forward_file(peer_t p)
{
  void* received;
  int done = 0;

  while (!done) {
    sg_activity_set_push(p->pending_recvs, (sg_activity_t)sg_mailbox_get_async(p->me, &received));

    sg_activity_t acti = sg_activity_set_wait_any(p->pending_recvs);
    if (acti != NULL) {
      sg_comm_unref((sg_comm_t)acti);
      XBT_DEBUG("Peer %s got a 'SEND_DATA' message", sg_mailbox_get_name(p->me));

      if (p->next != NULL) {
        XBT_DEBUG("Sending %s (asynchronously) from %s to %s", (char*)received, sg_mailbox_get_name(p->me),
                  sg_mailbox_get_name(p->next));
        sg_comm_t send = sg_mailbox_put_async(p->next, received, MESSAGE_SEND_DATA_HEADER_SIZE + PIECE_SIZE);
        sg_activity_set_push(p->pending_sends, (sg_activity_t)send);
      } else
        free(received);

      p->received_pieces++;
      p->received_bytes += PIECE_SIZE;
      XBT_DEBUG("%u pieces received, %llu bytes received", p->received_pieces, p->received_bytes);
      if (p->received_pieces >= p->total_pieces) {
        done = 1;
      }
    }
  }
  sg_activity_set_wait_all(p->pending_sends);
}

static peer_t peer_init(int argc, char* argv[])
{
  peer_t p           = xbt_malloc(sizeof(s_peer_t));
  p->prev            = NULL;
  p->next            = NULL;
  p->received_pieces = 0;
  p->received_bytes  = 0;
  p->pending_recvs   = sg_activity_set_init();
  p->pending_sends   = sg_activity_set_init();

  p->me = sg_mailbox_by_name(sg_host_self_get_name());

  return p;
}

static void peer_delete(peer_t p)
{
  sg_activity_set_delete(p->pending_recvs);
  sg_activity_set_delete(p->pending_sends);

  xbt_free(p);
}

void peer(int argc, char* argv[])
{
  XBT_DEBUG("peer");

  peer_t p          = peer_init(argc, argv);
  double start_time = simgrid_get_clock();
  peer_join_chain(p);
  peer_forward_file(p);
  double end_time = simgrid_get_clock();

  XBT_INFO("### %f %llu bytes (Avg %f MB/s); copy finished (simulated).", end_time - start_time, p->received_bytes,
           p->received_bytes / 1024.0 / 1024.0 / (end_time - start_time));

  peer_delete(p);
}