File: ConcurrentQueue.h

package info (click to toggle)
rdkit 202209.3-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 203,880 kB
  • sloc: cpp: 334,239; python: 80,247; ansic: 24,579; java: 7,667; sql: 2,123; yacc: 1,884; javascript: 1,358; lex: 1,260; makefile: 576; xml: 229; fortran: 183; cs: 181; sh: 101
file content (132 lines) | stat: -rw-r--r-- 3,454 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
//
//  Copyright (C) 2020 Shrey Aryan
//
//   @@ All Rights Reserved @@
//  This file is part of the RDKit.
//  The contents are covered by the terms of the BSD license
//  which is included in the file license.txt, found at the root
//  of the RDKit source tree.
//
#ifdef RDK_BUILD_THREADSAFE_SSS
#ifndef CONCURRENT_QUEUE
#define CONCURRENT_QUEUE
#include <condition_variable>
#include <thread>
#include <vector>

namespace RDKit {
template <typename E>
class ConcurrentQueue {
 private:
  unsigned int d_capacity;
  bool d_done;
  std::vector<E> d_elements;
  unsigned int d_head, d_tail;
  mutable std::mutex d_lock;
  std::condition_variable d_notEmpty, d_notFull;

 private:
  ConcurrentQueue(const ConcurrentQueue<E>&);
  ConcurrentQueue& operator=(const ConcurrentQueue<E>&);

 public:
  ConcurrentQueue(unsigned int capacity)
      : d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
    std::vector<E> elements(capacity);
    d_elements = elements;
  }

  //! tries to push an element into the queue if it is not full without
  //! modifying the variable element, if the queue is full then pushing an
  //! element will result in blocking
  void push(const E& element);

  //! tries to pop an element from the queue if it is not empty and not done
  //! the boolean value indicates the whether popping is successful, if the
  //! queue is empty and not done then popping an element will result in
  //! blocking
  bool pop(E& element);

  //! checks whether the ConcurrentQueue is empty
  bool isEmpty() const;

  //! returns the value of the variable done
  bool getDone() const;

  //! sets the variable d_done = true
  void setDone();

  //! clears the vector
  void clear();
};

template <typename E>
void ConcurrentQueue<E>::push(const E& element) {
  std::unique_lock<std::mutex> lk(d_lock);
  //! concurrent queue is full so we wait until
  //! it is not full
  while (d_head + d_capacity == d_tail) {
    d_notFull.wait(lk);
  }
  bool wasEmpty = (d_head == d_tail);
  d_elements.at(d_tail % d_capacity) = element;
  d_tail++;
  //! if the concurrent queue was empty before
  //! then it is not any more since we have "pushed" an element
  //! thus we notify all the consumer threads
  if (wasEmpty) {
    d_notEmpty.notify_all();
  }
}

template <typename E>
bool ConcurrentQueue<E>::pop(E& element) {
  std::unique_lock<std::mutex> lk(d_lock);
  //! concurrent queue is empty so we wait until
  //! it is not empty
  while (d_head == d_tail) {
    if (d_done) {
      return false;
    }
    d_notEmpty.wait(lk);
  }
  bool wasFull = (d_head + d_capacity == d_tail);
  element = d_elements.at(d_head % d_capacity);
  d_head++;
  //! if the concurrent queue was full before
  //! then it is not any more since we have "popped" an element
  //! thus we notify all producer threads
  if (wasFull) {
    d_notFull.notify_all();
  }
  return true;
}

template <typename E>
bool ConcurrentQueue<E>::isEmpty() const {
  std::unique_lock<std::mutex> lk(d_lock);
  return (d_head == d_tail);
}

template <typename E>
bool ConcurrentQueue<E>::getDone() const {
  std::unique_lock<std::mutex> lk(d_lock);
  return d_done;
}

template <typename E>
void ConcurrentQueue<E>::setDone() {
  std::unique_lock<std::mutex> lk(d_lock);
  d_done = true;
  d_notEmpty.notify_all();
}

template <typename E>
void ConcurrentQueue<E>::clear() {
  std::unique_lock<std::mutex> lk(d_lock);
  d_elements.clear();
}

}  // namespace RDKit
#endif
#endif