File: s4u-plugin-prodcons.cpp

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (91 lines) | stat: -rw-r--r-- 3,141 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/* Copyright (c) 2007-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

#include <simgrid/plugins/ProducerConsumer.hpp>
#include <simgrid/s4u/ActivitySet.hpp>
#include <simgrid/s4u/Actor.hpp>
#include <simgrid/s4u/Engine.hpp>
#include <simgrid/s4u/Host.hpp>
#include <xbt/random.hpp>

XBT_LOG_NEW_DEFAULT_CATEGORY(s4u_test, "Messages specific for this s4u example");

namespace sg4 = simgrid::s4u;

static void ingester(int id, simgrid::plugin::ProducerConsumerPtr<int> pc)
{
  sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1));
  for (int i = 0; i < 3; i++) {
    auto* data = new int(10 * id + i);
    pc->put(data, 1.2125e6); // last for 0.01s
    XBT_INFO("data sucessfully put: %d", *data);
    sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1));
  }

  sg4::ActivitySet pending;
  for (int i = 0; i < 3; i++) {
    auto* data = new int(10 * id + i);
    pending.push(pc->put_async(data, 1.2125e6)); // last for 0.01s
    XBT_INFO("data sucessfully put: %d", *data);
    sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1));
  }
  pending.wait_all();
}

static void retriever(simgrid::plugin::ProducerConsumerPtr<int> pc)
{
  sg4::this_actor::sleep_for(simgrid::xbt::random::uniform_real(0, 1));
  for (int i = 0; i < 3; i++) {
    int* data;
    sg4::CommPtr comm = pc->get_async(&data);
    comm->wait();
    XBT_INFO("data sucessfully get: %d", *data);
    delete data;
    sg4::this_actor::sleep_for((i + 3) * simgrid::xbt::random::uniform_real(0, 1));
  }

  for (int i = 0; i < 3; i++) {
    int* data = pc->get();
    XBT_INFO("data sucessfully get: %d", *data);
    delete data;
    sg4::this_actor::sleep_for((3 - i) * simgrid::xbt::random::uniform_real(0, 1));
  }
}

int main(int argc, char* argv[])
{
  sg4::Engine e(&argc, argv);

  // Platform creation
  auto* cluster = e.get_netzone_root()->add_netzone_star("cluster");
  for (int i = 0; i < 8; i++) {
    std::string hostname = "node-" + std::to_string(i) + ".simgrid.org";

    const auto* host = cluster->add_host(hostname, "1Gf");

    std::string linkname = "cluster_link_" + std::to_string(i);
    const auto* link     = cluster->add_split_duplex_link(linkname, "1Gbps");

    cluster->add_route(host, nullptr,  {{link, sg4::LinkInRoute::Direction::UP}}, true);
  }
  cluster->seal();

  simgrid::plugin::ProducerConsumerPtr<int> pc = simgrid::plugin::ProducerConsumer<int>::create(2);

  XBT_INFO("Maximum number of queued data is %u", pc->get_max_queue_size());
  XBT_INFO("Transfers are done in %s mode", pc->get_transfer_mode().c_str());

  for (int i = 0; i < 3; i++) {
    std::string hostname = "node-" + std::to_string(i) + ".simgrid.org";
    e.host_by_name(hostname)->add_actor("ingester-" + std::to_string(i), &ingester, i, pc);

    hostname = "node-" + std::to_string(i + 3) + ".simgrid.org";
    e.host_by_name(hostname)->add_actor("retriever-" + std::to_string(i), &retriever, pc);
  }

  e.run();

  return 0;
}