1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
/* Copyright (c) 2006-2025. The SimGrid Team. All rights reserved. */
/* This program is free software; you can redistribute it and/or modify it
* under the terms of the license (GNU LGPL) which comes with this package. */
#include "Utils.hpp"
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <map>
#include <random>
#include <simgrid/s4u.hpp>
#include <smpi/smpi.h>
namespace sg4 = simgrid::s4u;
class NormalMixture : public Sampler {
std::vector<std::normal_distribution<double>> mixture_;
std::vector<double> prob_;
std::mt19937& gen_;
public:
explicit NormalMixture(std::mt19937& gen) : gen_(gen) {}
void append(double mean, double stddev, double prob)
{
mixture_.push_back(std::normal_distribution<double>(mean, stddev));
prob_.push_back(prob);
}
double sample() override
{
std::discrete_distribution<> d(prob_.begin(), prob_.end());
int index = d(gen_);
auto& normal = mixture_[index];
double value = normal(gen_);
return value;
}
};
/**
* @brief Callback to set latency factor for a communication
*
* @param latency_base The base latency for this calibration (user-defined)
* @param seg Segmentation (user-defined)
* @param size Message size (SimGrid)
*/
static double latency_factor_cb(double latency_base, const SegmentedRegression& seg, double size,
const sg4::Host* /*src*/, const sg4::Host* /*dst*/,
const std::vector<sg4::Link*>& /*links*/,
const std::unordered_set<sg4::NetZone*>& /*netzones*/)
{
if (size < 63305)
return 0.0; // no calibration for small messages
return seg.sample(size) / latency_base;
}
/**
* @brief Callback to set bandwidth factor for a communication
*
* @param bw_base The base bandwidth for this calibration (user-defined)
* @param seg Segmentation (user-defined)
* @param size Message size (SimGrid)
*/
static double bw_factor_cb(double bw_base, const SegmentedRegression& seg, double size, const sg4::Host* /*src*/,
const sg4::Host* /*dst*/, const std::vector<sg4::Link*>& /*links*/,
const std::unordered_set<sg4::NetZone*>& /*netzones*/)
{
if (size < 63305)
return 1.0; // no calibration for small messages
double est_bw = 1.0 / seg.get_coef(size);
return est_bw / bw_base;
}
static double smpi_cost_cb(const SegmentedRegression& seg, double size, sg4::Host* /*src*/, sg4::Host* /*dst*/)
{
return seg.sample(size);
}
static SegmentedRegression read_json_file(const std::string& jsonFile, std::mt19937& gen, bool read_coef = true)
{
boost::property_tree::ptree pt;
boost::property_tree::read_json(jsonFile, pt);
std::unordered_map<double, std::shared_ptr<NormalMixture>> mixtures;
std::unordered_map<double, double> coefs;
printf("Starting parsing file: %s\n", jsonFile.c_str());
pt = pt.get_child("seg"); // go to segments part
SegmentedRegression seg(read_coef);
for (const auto& it : pt) {
double max = it.second.get_child("max_x").get_value<double>();
coefs[max] = it.second.get_child("coefficient").get_value<double>();
auto& mixture = mixtures[max];
if (not mixture)
mixture = std::make_shared<NormalMixture>(gen);
mixture->append(it.second.get_child("mean").get_value<double>(), it.second.get_child("sd").get_value<double>(),
it.second.get_child("prob").get_value<double>());
}
for (const auto& i : mixtures) {
seg.append(i.first, coefs[i.first], i.second);
}
return seg;
}
/** @brief Programmatic version of dahu */
extern "C" void load_platform(const sg4::Engine& e);
void load_platform(const sg4::Engine& e)
{
// setting threshold sync/async modes
e.set_config("smpi/async-small-thresh", 63305);
e.set_config("smpi/send-is-detached-thresh", 63305);
/* reading bandwidth and latency base value. It is the same for all regressions, get from first file */
boost::property_tree::ptree pt;
boost::property_tree::read_json("pingpong_ckmeans.json", pt);
double bw_base = pt.get_child("bandwidth_base").get_value<double>();
double lat_base = pt.get_child("latency_base").get_value<double>();
printf("Read bandwidth_base: %e latency_base: %e\n", bw_base, lat_base);
load_dahu_platform(e, bw_base, lat_base);
static std::mt19937 gen(42); // remove it from stack, since we need it after this this load_platform function is over
/* setting network factors callbacks */
auto* zone = e.get_netzone_root();
SegmentedRegression seg = read_json_file("pingpong_ckmeans.json", gen, false);
zone->set_latency_factor_cb(std::bind(&latency_factor_cb, lat_base, seg, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
zone->set_bandwidth_factor_cb(std::bind(&bw_factor_cb, bw_base, seg, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, std::placeholders::_5));
seg = read_json_file("send_ckmeans.json", gen);
smpi_register_op_cost_callback(SmpiOperation::SEND, std::bind(&smpi_cost_cb, seg, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
seg = read_json_file("isend_ckmeans.json", gen);
smpi_register_op_cost_callback(SmpiOperation::ISEND, std::bind(&smpi_cost_cb, seg, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
seg = read_json_file("recv_ckmeans.json", gen);
smpi_register_op_cost_callback(SmpiOperation::RECV, std::bind(&smpi_cost_cb, seg, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}
|