File: s4u-task-storm.cpp

package info (click to toggle)
simgrid 4.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 38,980 kB
  • sloc: cpp: 123,583; ansic: 66,779; python: 8,358; java: 6,406; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,337; perl: 1,436; makefile: 105; lisp: 49; javascript: 7; sed: 6
file content (133 lines) | stat: -rw-r--r-- 4,889 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/* Copyright (c) 2017-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

/* This example takes the main concepts of Apache Storm presented here
   https://storm.apache.org/releases/2.4.0/Concepts.html and use them to build a simulation of a stream processing
   application

   Spout SA produces data every 100ms. The volume produced is alternatively 1e3, 1e6 and 1e9 bytes.
   Spout SB produces 1e6 bytes every 200ms.

   Bolt B1 and B2 processes data from Spout SA alternatively. The quantity of work to process this data is 10 flops per
   bytes Bolt B3 processes data from Spout SB. Bolt B4 processes data from Bolt B3.

                        Fafard
                        ┌────┐
                    ┌──►│ B1 │
         Tremblay   │   └────┘
          ┌────┐    │
          │ SA ├────┤  Ginette
          └────┘    │   ┌────┐
                    └──►│ B2 │
                        └────┘


                       Bourassa
         Jupiter     ┌──────────┐
          ┌────┐     │          │
          │ SB ├─────┤ B3 ──► B4│
          └────┘     │          │
                     └──────────┘
 */

#include "simgrid/s4u.hpp"

XBT_LOG_NEW_DEFAULT_CATEGORY(task_storm, "Messages specific for this s4u example");
namespace sg4 = simgrid::s4u;

int main(int argc, char* argv[])
{
  sg4::Engine e(&argc, argv);
  e.load_platform(argv[1]);

  // Retrieve hosts
  auto tremblay = e.host_by_name("Tremblay");
  auto jupiter  = e.host_by_name("Jupiter");
  auto fafard   = e.host_by_name("Fafard");
  auto ginette  = e.host_by_name("Ginette");
  auto bourassa = e.host_by_name("Bourassa");

  // Create execution tasks
  auto SA = sg4::ExecTask::init("SA", tremblay->get_speed() * 0.1, tremblay);
  auto SB = sg4::ExecTask::init("SB", jupiter->get_speed() * 0.2, jupiter);
  auto B1 = sg4::ExecTask::init("B1", 1e8, fafard);
  auto B2 = sg4::ExecTask::init("B2", 1e8, ginette);
  auto B3 = sg4::ExecTask::init("B3", 1e8, bourassa);
  auto B4 = sg4::ExecTask::init("B4", 2e8, bourassa);

  // Create communication tasks
  auto SA_to_B1 = sg4::CommTask::init("SA_to_B1", 0, tremblay, fafard);
  auto SA_to_B2 = sg4::CommTask::init("SA_to_B2", 0, tremblay, ginette);
  auto SB_to_B3 = sg4::CommTask::init("SB_to_B3", 1e6, jupiter, bourassa);

  // Create the graph by defining dependencies between tasks
  // Some dependencies are defined dynamically
  SA_to_B1->add_successor(B1);
  SA_to_B2->add_successor(B2);
  SB->add_successor(SB_to_B3);
  SB_to_B3->add_successor(B3);
  B3->add_successor(B4);

  /* Dynamic modification of the graph and bytes sent
     Alternatively we: remove/add the link between SA and SA_to_B2
                       add/remove the link between SA and SA_to_B1
  */
  SA->on_this_completion_cb([&SA_to_B1, &SA_to_B2](sg4::Task* t) {
    int count = t->get_count();
    sg4::CommTaskPtr comm;
    if (count % 2 == 1) {
      t->remove_successor(SA_to_B2);
      t->add_successor(SA_to_B1);
      comm = SA_to_B1;
    } else {
      t->remove_successor(SA_to_B1);
      t->add_successor(SA_to_B2);
      comm = SA_to_B2;
    }
    std::vector<double> amount = {1e9, 1e3, 1e6};
    comm->set_amount(amount[count % 3]);
    auto token = std::make_shared<sg4::Token>();
    token->set_data(new double(amount[count % 3]));
    t->set_token(token);
  });

  // The token sent by SA is forwarded by both communication tasks
  SA_to_B1->on_this_completion_cb([&SA](sg4::Task* t) {
    t->set_token(t->get_token_from(SA));
    t->deque_token_from(SA);
  });
  SA_to_B2->on_this_completion_cb([&SA](sg4::Task* t) {
    t->set_token(t->get_token_from(SA));
    t->deque_token_from(SA);
  });

  /* B1 and B2 read the value of the token received by their predecessors
     and use it to adapt their amount of work to do.
  */
  B1->on_this_start_cb([&SA_to_B1](sg4::Task* t) {
    auto data = t->get_token_from(SA_to_B1)->get_data<double>();
    t->deque_token_from(SA_to_B1);
    t->set_amount(*data * 10);
    delete data;
  });
  B2->on_this_start_cb([&SA_to_B2](sg4::Task* t) {
    auto data = t->get_token_from(SA_to_B2)->get_data<double>();
    t->deque_token_from(SA_to_B2);
    t->set_amount(*data * 10);
    delete data;
  });

  // Enqueue firings for tasks without predecessors
  SA->enqueue_firings(5);
  SB->enqueue_firings(5);

  // Add a function to be called when tasks end for log purpose
  sg4::Task::on_completion_cb(
      [](const sg4::Task* t) { XBT_INFO("Task %s finished (%d)", t->get_name().c_str(), t->get_count()); });

  // Start the simulation
  e.run();
  return 0;
}