1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
#ifdef RDK_BUILD_THREADSAFE_SSS
#include <catch2/catch_all.hpp>
#include <RDGeneral/RDLog.h>
#include <functional>
#include <iomanip>
#include <sstream>
#include "ConcurrentQueue.h"
using namespace RDKit;
TEST_CASE("testPushAndPop") {
ConcurrentQueue<int> *q = new ConcurrentQueue<int>(4);
int e1, e2, e3;
REQUIRE(q->isEmpty());
q->push(1);
q->push(2);
q->push(3);
REQUIRE(!q->isEmpty());
REQUIRE(q->pop(e1));
REQUIRE(q->pop(e2));
REQUIRE(q->pop(e3));
REQUIRE(e1 == 1);
REQUIRE(e2 == 2);
REQUIRE(e3 == 3);
REQUIRE(q->isEmpty());
delete (q);
}
void produce(ConcurrentQueue<int> &q, const int numToProduce) {
for (int i = 0; i < numToProduce; ++i) {
q.push(i);
}
}
void consume(ConcurrentQueue<int> &q, std::vector<int> &result) {
int element;
while (q.pop(element)) {
result.push_back(element);
}
}
//! multithreaded testing for ConcurrentQueue
bool testProducerConsumer(const int numProducerThreads,
const int numConsumerThreads) {
ConcurrentQueue<int> q(5);
REQUIRE(q.isEmpty());
const int numToProduce = 10;
std::vector<std::thread> producers(numProducerThreads);
std::vector<std::thread> consumers(numConsumerThreads);
std::vector<std::vector<int>> results(numConsumerThreads);
//! start producer threads
for (int i = 0; i < numProducerThreads; i++) {
producers[i] = std::thread(produce, std::ref(q), numToProduce);
}
//! start consumer threads
for (int i = 0; i < numConsumerThreads; i++) {
consumers[i] = std::thread(consume, std::ref(q), std::ref(results[i]));
}
std::for_each(producers.begin(), producers.end(),
std::mem_fn(&std::thread::join));
//! the producer is done producing
q.setDone();
std::for_each(consumers.begin(), consumers.end(),
std::mem_fn(&std::thread::join));
REQUIRE(q.isEmpty());
std::vector<int> frequency(numToProduce, 0);
for (auto &result : results) {
for (auto &element : result) {
frequency[element] += 1;
}
}
for (auto &freq : frequency) {
if (freq != numProducerThreads) {
return false;
}
}
return true;
}
TEST_CASE("testMultipleTimes") {
const int trials = 10000;
//! Single Producer, Single Consumer
for (int i = 0; i < trials; i++) {
REQUIRE(testProducerConsumer(1, 1));
}
//! Single Producer, Multiple Consumer
for (int i = 0; i < trials; i++) {
REQUIRE(testProducerConsumer(1, 5));
}
//! Multiple Producer, Single Consumer
for (int i = 0; i < trials; i++) {
REQUIRE(testProducerConsumer(5, 1));
}
//! Multiple Producer, Multiple Consumer
for (int i = 0; i < trials; i++) {
REQUIRE(testProducerConsumer(2, 4));
}
}
#endif
|