File: tuto_disk.cpp

package info (click to toggle)
simgrid 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 39,192 kB
  • sloc: cpp: 124,913; ansic: 66,744; python: 8,560; java: 6,773; fortran: 6,079; f90: 5,123; xml: 4,587; sh: 2,194; perl: 1,436; makefile: 111; lisp: 49; javascript: 7; sed: 6
file content (175 lines) | stat: -rw-r--r-- 6,448 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
/* Copyright (c) 2017-2025. The SimGrid Team. All rights reserved.          */

/* This program is free software; you can redistribute it and/or modify it
 * under the terms of the license (GNU LGPL) which comes with this package. */

/* This tutorial presents how to faithful emulate disk operation in SimGrid
 */

#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <random>
#include <simgrid/s4u.hpp>

namespace sg4 = simgrid::s4u;

XBT_LOG_NEW_DEFAULT_CATEGORY(disk_test, "Messages specific for this simulation");

/** @brief Calculates the bandwidth for disk doing async operations */
static void estimate_bw(const sg4::Disk* disk, int n, int n_flows, bool read)
{
  unsigned long long size = 100000;
  double cur_time         = sg4::Engine::get_clock();
  std::vector<sg4::IoPtr> activities;
  for (int i = 0; i < n_flows; i++) {
    sg4::IoPtr act;
    if (read)
      act = disk->read_async(size);
    else
      act = disk->write_async(size);

    activities.push_back(act);
  }

  for (const auto& act : activities)
    act->wait();

  double elapsed_time = sg4::Engine::get_clock() - cur_time;
  printf("%s,%s,%d,%d,%llu,%lf\n", disk->get_cname(), read ? "read" : "write", n, n_flows, size, elapsed_time);
}

static void host()
{
  /* - Estimating bw for each disk and considering concurrent flows */
  for (int i = 0; i < 20; i++) {
    for (int flows = 1; flows <= 15; flows++) {
      for (auto* disk : sg4::Host::current()->get_disks()) {
        estimate_bw(disk, i, flows, true);
        estimate_bw(disk, i, flows, false);
      }
    }
  }
}

/*************************************************************************************************/
/** @brief Auxiliary class to generate noise in disk operations */
class DiskNoise {
  double bw_;
  std::vector<double> breaks_;
  std::vector<double> heights_;
  std::mt19937& gen_;

public:
  DiskNoise(double capacity, std::mt19937& gen, const std::vector<double>& b, const std::vector<double>& h)
      : bw_(capacity), breaks_(b), heights_(h), gen_(gen)
  {
  }
  double operator()(sg_size_t /*size*/) const
  {
    std::piecewise_constant_distribution<double> d(breaks_.begin(), breaks_.end(), heights_.begin());
    auto value = d(gen_);
    return value / bw_;
  }
};

/** @brief Auxiliary method to get list of values from json in a vector */
static std::vector<double> get_list_from_json(const boost::property_tree::ptree& pt, const std::string& path)
{
  std::vector<double> v;
  for (const auto& it : pt.get_child(path)) {
    double value = it.second.get_value<double>();
    v.push_back(value * 1e6);
  }
  return v;
}
/*************************************************************************************************/
/**
 * @brief Non-linear resource callback for disks
 *
 * @param degradation Vector with effective read/write bandwidth
 * @param capacity Resource current capacity in SimGrid
 * @param n Number of activities sharing this resource
 */
static double disk_dynamic_sharing(const std::vector<double>& degradation, double capacity, int n)
{
  n--;
  if (n >= degradation.size())
    return capacity;
  return degradation[n];
}

/**
 * @brief Noise for I/O operations
 *
 * @param data Map with noise information
 * @param size I/O size in bytes
 * @param op I/O operation: read/write
 */
static double disk_variability(const std::unordered_map<sg4::Io::OpType, DiskNoise>& data, sg_size_t size,
                               sg4::Io::OpType op)
{
  auto it = data.find(op);
  if (it == data.end())
    return 1.0;
  double value = it->second(size);
  return value;
}

/** @brief Creates a disk */
static void create_disk(sg4::Host* host, std::mt19937& gen, const std::string& disk_name,
                        const boost::property_tree::ptree& pt)
{
  double read_bw                = pt.get_child("read_bw").begin()->second.get_value<double>() * 1e6;
  double write_bw               = pt.get_child("write_bw").begin()->second.get_value<double>() * 1e6;
  auto* disk                    = host->add_disk(disk_name, read_bw, write_bw);
  std::vector<double> read_deg  = get_list_from_json(pt, "degradation.read");
  std::vector<double> write_deg = get_list_from_json(pt, "degradation.write");

  /* get maximum possible disk speed for read/write disk constraint */
  double max_bw = std::max(*std::max_element(read_deg.begin(), read_deg.end()),
                           *std::max_element(write_deg.begin(), write_deg.end()));
  disk->set_readwrite_bandwidth(max_bw);

  disk->set_sharing_policy(sg4::Disk::Operation::READ, sg4::Disk::SharingPolicy::NONLINEAR,
                           std::bind(&disk_dynamic_sharing, read_deg, std::placeholders::_1, std::placeholders::_2));
  disk->set_sharing_policy(sg4::Disk::Operation::WRITE, sg4::Disk::SharingPolicy::NONLINEAR,
                           std::bind(&disk_dynamic_sharing, write_deg, std::placeholders::_1, std::placeholders::_2));
  /* this is the default behavior, expliciting only to make it clearer */
  disk->set_sharing_policy(sg4::Disk::Operation::READWRITE, sg4::Disk::SharingPolicy::LINEAR);

  /* configuring variability */
  DiskNoise read_noise(read_bw, gen, get_list_from_json(pt, "noise.read.breaks"),
                       get_list_from_json(pt, "noise.read.heights"));
  DiskNoise write_noise(write_bw, gen, get_list_from_json(pt, "noise.write.breaks"),
                        get_list_from_json(pt, "noise.write.heights"));

  std::unordered_map<sg4::Io::OpType, DiskNoise> noise{{sg4::Io::OpType::READ, read_noise},
                                                       {sg4::Io::OpType::WRITE, write_noise}};
  disk->set_factor_cb(std::bind(&disk_variability, noise, std::placeholders::_1, std::placeholders::_2));
  disk->seal();
}

/*************************************************************************************************/
int main(int argc, char** argv)
{
  sg4::Engine e(&argc, argv);
  std::mt19937 gen(42);
  /* simple platform containing 1 host and 2 disk */
  auto* zone = e.get_netzone_root()->add_netzone_full("bob_zone");
  auto* bob  = zone->add_host("bob", 1e6);
  std::ifstream jsonFile("IO_noise.json");
  boost::property_tree::ptree pt;
  boost::property_tree::read_json(jsonFile, pt);
  for (const auto& it : pt.get_child("")) {
    add_disk(bob, gen, it.first, it.second);
  }
  zone->seal();

  sg4::Actor::create("", bob, host);

  printf("disk,op,n,flows,size,elapsed\n");

  e.run();

  return 0;
}