1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
|
#include <aocommon/overlappingtaskprocessor.h>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <vector>
#include <boost/test/unit_test.hpp>
using aocommon::OverlappingTaskProcessor;
BOOST_AUTO_TEST_SUITE(overlappingtaskprocessor)
struct TestTask {
size_t lane;
size_t task;
};
struct TestWorkPhase {
size_t lane;
size_t task;
size_t phase;
};
BOOST_AUTO_TEST_CASE(multiple_threads) {
const size_t kNumSimulatedLanes = 100;
const size_t kNumTasksPerLane = 10;
const size_t kNumProcessingPhasesPerTask = 3;
const size_t kNumThreads = 3;
std::random_device random_seed;
std::mt19937 random_generator(random_seed());
std::uniform_int_distribution<> random_size_small(1, 50);
std::uniform_int_distribution<> random_size_large(50, 150);
// Single queue with threads for processing all work.
aocommon::TaskQueue<std::function<void()>> run_task_queue;
std::vector<std::thread> run_thread_pool;
run_thread_pool.reserve(kNumThreads);
for (size_t i = 0; i < kNumThreads; ++i) {
run_thread_pool.emplace_back([&] {
std::function<void()> operation;
while (run_task_queue.Pop(operation)) {
operation();
}
});
}
// Wrapped in an overlapped processor to allow overlaps.
OverlappingTaskProcessor overlapping_processor(run_task_queue);
std::mutex processing_order_mutex;
std::vector<TestWorkPhase> processing_order;
std::vector<std::mutex> phase_mutex(kNumProcessingPhasesPerTask);
// Track concurrency.
size_t max_concurrent_phases = 0;
std::atomic<size_t> current_concurrent_phases = 0;
// Simulate reading in multiple chunks for multiple tasks with a lane for each
// task. Call Process() for each task. Limit Process() calls to maximum 2 in
// parallel.
std::counting_semaphore<2> task_semaphore(2);
for (size_t lane = 0; lane < kNumSimulatedLanes; ++lane) {
task_semaphore.acquire();
aocommon::Lane<TestTask> task_lane(kNumTasksPerLane / 2);
std::thread([&]() {
for (size_t task = 0; task < kNumTasksPerLane; ++task) {
TestTask chunk_data(lane, task);
// Simulate data read time.
std::this_thread::sleep_for(
std::chrono::microseconds(random_size_small(random_seed)));
task_lane.write(std::move(chunk_data));
}
task_lane.write_end();
}).detach();
overlapping_processor.Process<TestTask>(
task_lane,
[&](TestTask&& chunk_data, size_t chunk_index,
std::binary_semaphore& processing_order_semaphore) mutable {
size_t lane = chunk_data.lane;
size_t task = chunk_data.task;
current_concurrent_phases++;
{
std::lock_guard<std::mutex> lock_processing_order(
processing_order_mutex);
max_concurrent_phases = std::max(max_concurrent_phases,
size_t{current_concurrent_phases});
processing_order.push_back(TestWorkPhase(lane, task, 0));
}
{
// Ensure lock transition between phases without a race in between.
// When we assign to the unique_ptr the old lock will only be
// destroyed after the new one is created so we are guaranteed to
// gain the new lock before other threads can obtain the old one.
std::unique_ptr<std::lock_guard<std::mutex>> phase_lock;
for (size_t phase = 1; phase <= kNumProcessingPhasesPerTask;
++phase) {
phase_lock = std::make_unique<std::lock_guard<std::mutex>>(
phase_mutex[phase - 1]);
if (phase == 1) {
processing_order_semaphore.release();
}
// Simulate task processing time.
std::this_thread::sleep_for(
std::chrono::microseconds(random_size_large(random_seed)));
{
std::lock_guard<std::mutex> lock_processing_order(
processing_order_mutex);
processing_order.push_back(TestWorkPhase(lane, task, phase));
}
}
}
current_concurrent_phases--;
task_semaphore.release();
},
"TestTask");
}
// Wait for all processing to complete and cleanup threads.
run_task_queue.WaitForIdle(kNumThreads);
run_task_queue.Finish();
for (std::thread& thread : run_thread_pool) {
thread.join();
}
// There should have been 2 tasks running concurrently throughout most of this
// test.
BOOST_CHECK_EQUAL(max_concurrent_phases, 2);
// Ensure later tasks of later lanes never preceed earlier tasks of earlier
// lanes.
BOOST_CHECK(
std::is_sorted(processing_order.begin(), processing_order.end(),
[](const TestWorkPhase& p1, const TestWorkPhase& p2) {
return p1.lane < p2.lane && p1.task < p2.task;
}));
for (size_t lane = 0; lane < kNumSimulatedLanes; ++lane) {
std::vector<TestWorkPhase> lane_processing_order;
std::copy_if(processing_order.begin(), processing_order.end(),
std::back_inserter(lane_processing_order),
[&](TestWorkPhase item) { return item.lane == lane; });
// Ensure phases within a single task are always processed in order.
for (size_t task = 0; task < kNumTasksPerLane; ++task) {
std::vector<TestWorkPhase> task_processing_order;
std::copy_if(lane_processing_order.begin(), lane_processing_order.end(),
std::back_inserter(task_processing_order),
[&](TestWorkPhase item) { return item.task == task; });
BOOST_CHECK(std::is_sorted(
task_processing_order.begin(), task_processing_order.end(),
[](const TestWorkPhase& p1, const TestWorkPhase& p2) {
return p1.phase < p2.phase;
}));
}
// Ensure later phases of later tasks never preceed earlier phases of
// earlier tasks.
BOOST_CHECK(std::is_sorted(
lane_processing_order.begin(), lane_processing_order.end(),
[](const TestWorkPhase& p1, const TestWorkPhase& p2) {
return p1.task < p2.task && p1.phase < p2.phase;
}));
std::vector<TestWorkPhase> lane_start_processing_order;
std::copy_if(lane_processing_order.begin(), lane_processing_order.end(),
std::back_inserter(lane_start_processing_order),
[&](TestWorkPhase item) { return item.phase == 0; });
// Ensure tasks within a lane are always started in order.
BOOST_CHECK(std::is_sorted(
lane_start_processing_order.begin(), lane_start_processing_order.end(),
[](const TestWorkPhase& p1, const TestWorkPhase& p2) {
return p1.task < p2.task;
}));
}
}
BOOST_AUTO_TEST_SUITE_END()
|