File: testConcurrentQueue.cpp

package info (click to toggle)
rdkit 202503.6-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 222,024 kB
  • sloc: cpp: 411,111; python: 78,482; ansic: 26,181; java: 8,285; javascript: 4,404; sql: 2,393; yacc: 1,626; lex: 1,267; cs: 1,090; makefile: 580; xml: 229; fortran: 183; sh: 121
file content (117 lines) | stat: -rw-r--r-- 2,715 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#ifdef RDK_BUILD_THREADSAFE_SSS
#include <catch2/catch_all.hpp>
#include <RDGeneral/RDLog.h>

#include <functional>
#include <iomanip>
#include <sstream>

#include "ConcurrentQueue.h"

using namespace RDKit;

TEST_CASE("testPushAndPop") {
  ConcurrentQueue<int> *q = new ConcurrentQueue<int>(4);
  int e1, e2, e3;
  REQUIRE(q->isEmpty());

  q->push(1);
  q->push(2);
  q->push(3);

  REQUIRE(!q->isEmpty());

  REQUIRE(q->pop(e1));
  REQUIRE(q->pop(e2));
  REQUIRE(q->pop(e3));

  REQUIRE(e1 == 1);
  REQUIRE(e2 == 2);
  REQUIRE(e3 == 3);

  REQUIRE(q->isEmpty());

  delete (q);
}

void produce(ConcurrentQueue<int> &q, const int numToProduce) {
  for (int i = 0; i < numToProduce; ++i) {
    q.push(i);
  }
}

void consume(ConcurrentQueue<int> &q, std::vector<int> &result) {
  int element;
  while (q.pop(element)) {
    result.push_back(element);
  }
}

//! multithreaded testing for ConcurrentQueue
bool testProducerConsumer(const int numProducerThreads,
                          const int numConsumerThreads) {
  ConcurrentQueue<int> q(5);
  REQUIRE(q.isEmpty());

  const int numToProduce = 10;

  std::vector<std::thread> producers(numProducerThreads);
  std::vector<std::thread> consumers(numConsumerThreads);
  std::vector<std::vector<int>> results(numConsumerThreads);

  //! start producer threads
  for (int i = 0; i < numProducerThreads; i++) {
    producers[i] = std::thread(produce, std::ref(q), numToProduce);
  }
  //! start consumer threads
  for (int i = 0; i < numConsumerThreads; i++) {
    consumers[i] = std::thread(consume, std::ref(q), std::ref(results[i]));
  }

  std::for_each(producers.begin(), producers.end(),
                std::mem_fn(&std::thread::join));

  //! the producer is done producing
  q.setDone();

  std::for_each(consumers.begin(), consumers.end(),
                std::mem_fn(&std::thread::join));
  REQUIRE(q.isEmpty());

  std::vector<int> frequency(numToProduce, 0);
  for (auto &result : results) {
    for (auto &element : result) {
      frequency[element] += 1;
    }
  }
  for (auto &freq : frequency) {
    if (freq != numProducerThreads) {
      return false;
    }
  }
  return true;
}

TEST_CASE("testMultipleTimes") {
  const int trials = 10000;
  //! Single Producer, Single Consumer
  for (int i = 0; i < trials; i++) {
    REQUIRE(testProducerConsumer(1, 1));
  }

  //! Single Producer, Multiple Consumer
  for (int i = 0; i < trials; i++) {
    REQUIRE(testProducerConsumer(1, 5));
  }

  //! Multiple Producer, Single Consumer
  for (int i = 0; i < trials; i++) {
    REQUIRE(testProducerConsumer(5, 1));
  }

  //! Multiple Producer, Multiple Consumer
  for (int i = 0; i < trials; i++) {
    REQUIRE(testProducerConsumer(2, 4));
  }
}
#endif