File: execution.cpp

package info (click to toggle)
xenium 0.0.2%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,088 kB
  • sloc: cpp: 12,297; makefile: 20
file content (174 lines) | stat: -rw-r--r-- 4,734 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#include "execution.hpp"

#include <tao/config/value.hpp>

#ifdef WITH_LIBCDS
#include <cds/gc/hp.h>
#endif

#include <iostream>

using config_t = tao::config::value;

execution::execution(std::uint32_t round, std::uint32_t runtime, std::shared_ptr<benchmark> benchmark) :
  _state(execution_state::starting),
  _round(round),
  _runtime(runtime),
  _benchmark(std::move(benchmark))
{}

execution::~execution() {
  _state.store(execution_state::stopped);
  for (auto& thread : _threads)
    if (thread->_thread.joinable())
      thread->_thread.join();
}

void execution::create_threads(const config_t& config) {
  std::uint32_t total_count = 0;
  for (auto& it : config.get_object())
    total_count += it.second.optional<std::uint32_t>("count").value_or(1);

  _threads.reserve(total_count);

  std::uint32_t cnt = 0;
  for (auto& it : config.get_object()) {
    auto count = it.second.optional<std::uint32_t>("count").value_or(1);
    for (std::uint32_t i = 0; i < count; ++i, ++cnt) {
      auto type = it.second.optional<std::string>("type").value_or(it.first);
      auto id = (_round << thread_id_bits) | cnt;
      auto thread = _benchmark->create_thread(id, *this, type);
      _threads.push_back(std::move(thread));
      _threads.back()->setup(it.second);
    }
  }
}

execution_state execution::state(std::memory_order order) const {
  return _state.load(order);
}

round_report execution::run() {
  _state.store(execution_state::preparing);

  wait_until_all_threads_are(thread_state::running);

  _state.store(execution_state::initializing);

  wait_until_all_threads_are(thread_state::ready);

  _state.store(execution_state::running);

  auto start = std::chrono::high_resolution_clock::now();

  std::this_thread::sleep_for(std::chrono::milliseconds(_runtime));

  _state.store(execution_state::stopped);

  wait_until_all_threads_are(thread_state::finished);

  std::chrono::duration<double, std::milli> runtime = std::chrono::high_resolution_clock::now() - start;
  return build_report(runtime.count());
}

round_report execution::build_report(double runtime) {
  for (auto& thread : _threads)
    thread->_thread.join();
  
  std::vector<thread_report> thread_reports;
  thread_reports.reserve(_threads.size());
  for (unsigned i = 0; i < _threads.size(); ++i) {
    thread_reports.push_back(_threads[i]->report());
  }
  return { thread_reports, runtime };
}

void execution::wait_until_all_threads_are(thread_state state) {
  for (auto& thread : _threads)
    wait_until_thread_state_is(*thread, state);
}

void execution::wait_until_thread_state_is(const execution_thread& thread, thread_state expected) const {
  auto state = thread._state.load(std::memory_order_relaxed);
  while (state != expected) {
    if (state == thread_state::finished)
      throw std::runtime_error("worker thread finished prematurely");
    state = thread._state.load(std::memory_order_relaxed);
  }
}

execution_thread::execution_thread(std::uint32_t id, const execution& exec) :
  _execution(exec),
  _id(id),
  _thread(&execution_thread::thread_func, this)
{}

void execution_thread::thread_func() {

#ifdef WITH_LIBCDS
  cds::threading::Manager::attachThread();
#endif

  wait_until_all_threads_are_started();
  try {
    do_run();
  } catch (std::exception& e) {
    std::cout << "Thread " << std::this_thread::get_id() << " failed: " << e.what() << std::endl;
  }
  _state.store(thread_state::finished);

#ifdef WITH_LIBCDS
  cds::threading::Manager::detachThread();
#endif
}

void execution_thread::do_run() {
  if (_execution.state(std::memory_order_relaxed) == execution_state::stopped)
    return;

  _state.store(thread_state::running);

  wait_until_initialization();

  initialize(_execution.num_threads());

  _state.store(thread_state::ready);

  wait_until_benchmark_starts();

  auto start = std::chrono::high_resolution_clock::now();

  while (_execution.state() == execution_state::running)
    run();

  _runtime = std::chrono::high_resolution_clock::now() - start;
}

void execution_thread::setup(const config_t& config) {
  auto workload = config.find("workload");
  if (!workload)
    return;
  
  workload_factory factory;
  _workload = factory(*workload);
}

void execution_thread::simulate_workload() {
  if (_workload)
    _workload->simulate();
}

void execution_thread::wait_until_all_threads_are_started() {
  while (_execution.state(std::memory_order_acquire) == execution_state::starting)
    std::this_thread::sleep_for(std::chrono::milliseconds(20));
}

void execution_thread::wait_until_initialization() {
  while (_execution.state() == execution_state::preparing)
    ;
}

void execution_thread::wait_until_benchmark_starts() {
  while (_execution.state() == execution_state::initializing)
    ;
}