1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
#include "execution.hpp"
#include <tao/config/value.hpp>
#ifdef WITH_LIBCDS
#include <cds/gc/hp.h>
#endif
#include <iostream>
using config_t = tao::config::value;
execution::execution(std::uint32_t round, std::uint32_t runtime, std::shared_ptr<benchmark> benchmark) :
_state(execution_state::starting),
_round(round),
_runtime(runtime),
_benchmark(std::move(benchmark))
{}
execution::~execution() {
_state.store(execution_state::stopped);
for (auto& thread : _threads)
if (thread->_thread.joinable())
thread->_thread.join();
}
void execution::create_threads(const config_t& config) {
std::uint32_t total_count = 0;
for (auto& it : config.get_object())
total_count += it.second.optional<std::uint32_t>("count").value_or(1);
_threads.reserve(total_count);
std::uint32_t cnt = 0;
for (auto& it : config.get_object()) {
auto count = it.second.optional<std::uint32_t>("count").value_or(1);
for (std::uint32_t i = 0; i < count; ++i, ++cnt) {
auto type = it.second.optional<std::string>("type").value_or(it.first);
auto id = (_round << thread_id_bits) | cnt;
auto thread = _benchmark->create_thread(id, *this, type);
_threads.push_back(std::move(thread));
_threads.back()->setup(it.second);
}
}
}
execution_state execution::state(std::memory_order order) const {
return _state.load(order);
}
round_report execution::run() {
_state.store(execution_state::preparing);
wait_until_all_threads_are(thread_state::running);
_state.store(execution_state::initializing);
wait_until_all_threads_are(thread_state::ready);
_state.store(execution_state::running);
auto start = std::chrono::high_resolution_clock::now();
std::this_thread::sleep_for(std::chrono::milliseconds(_runtime));
_state.store(execution_state::stopped);
wait_until_all_threads_are(thread_state::finished);
std::chrono::duration<double, std::milli> runtime = std::chrono::high_resolution_clock::now() - start;
return build_report(runtime.count());
}
round_report execution::build_report(double runtime) {
for (auto& thread : _threads)
thread->_thread.join();
std::vector<thread_report> thread_reports;
thread_reports.reserve(_threads.size());
for (unsigned i = 0; i < _threads.size(); ++i) {
thread_reports.push_back(_threads[i]->report());
}
return { thread_reports, runtime };
}
void execution::wait_until_all_threads_are(thread_state state) {
for (auto& thread : _threads)
wait_until_thread_state_is(*thread, state);
}
void execution::wait_until_thread_state_is(const execution_thread& thread, thread_state expected) const {
auto state = thread._state.load(std::memory_order_relaxed);
while (state != expected) {
if (state == thread_state::finished)
throw std::runtime_error("worker thread finished prematurely");
state = thread._state.load(std::memory_order_relaxed);
}
}
execution_thread::execution_thread(std::uint32_t id, const execution& exec) :
_execution(exec),
_id(id),
_thread(&execution_thread::thread_func, this)
{}
void execution_thread::thread_func() {
#ifdef WITH_LIBCDS
cds::threading::Manager::attachThread();
#endif
wait_until_all_threads_are_started();
try {
do_run();
} catch (std::exception& e) {
std::cout << "Thread " << std::this_thread::get_id() << " failed: " << e.what() << std::endl;
}
_state.store(thread_state::finished);
#ifdef WITH_LIBCDS
cds::threading::Manager::detachThread();
#endif
}
void execution_thread::do_run() {
if (_execution.state(std::memory_order_relaxed) == execution_state::stopped)
return;
_state.store(thread_state::running);
wait_until_initialization();
initialize(_execution.num_threads());
_state.store(thread_state::ready);
wait_until_benchmark_starts();
auto start = std::chrono::high_resolution_clock::now();
while (_execution.state() == execution_state::running)
run();
_runtime = std::chrono::high_resolution_clock::now() - start;
}
void execution_thread::setup(const config_t& config) {
auto workload = config.find("workload");
if (!workload)
return;
workload_factory factory;
_workload = factory(*workload);
}
void execution_thread::simulate_workload() {
if (_workload)
_workload->simulate();
}
void execution_thread::wait_until_all_threads_are_started() {
while (_execution.state(std::memory_order_acquire) == execution_state::starting)
std::this_thread::sleep_for(std::chrono::milliseconds(20));
}
void execution_thread::wait_until_initialization() {
while (_execution.state() == execution_state::preparing)
;
}
void execution_thread::wait_until_benchmark_starts() {
while (_execution.state() == execution_state::initializing)
;
}
|