1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
//
// Copyright (C) 2020 Shrey Aryan
//
// @@ All Rights Reserved @@
// This file is part of the RDKit.
// The contents are covered by the terms of the BSD license
// which is included in the file license.txt, found at the root
// of the RDKit source tree.
//
#ifdef RDK_BUILD_THREADSAFE_SSS
#ifndef CONCURRENT_QUEUE
#define CONCURRENT_QUEUE
#include <condition_variable>
#include <thread>
#include <vector>
namespace RDKit {
template <typename E>
class ConcurrentQueue {
private:
unsigned int d_capacity;
bool d_done;
std::vector<E> d_elements;
unsigned int d_head, d_tail;
mutable std::mutex d_lock;
std::condition_variable d_notEmpty, d_notFull;
private:
ConcurrentQueue(const ConcurrentQueue<E>&);
ConcurrentQueue& operator=(const ConcurrentQueue<E>&);
public:
ConcurrentQueue(unsigned int capacity)
: d_capacity(capacity), d_done(false), d_head(0), d_tail(0) {
std::vector<E> elements(capacity);
d_elements = elements;
}
//! tries to push an element into the queue if it is not full without
//! modifying the variable element, if the queue is full then pushing an
//! element will result in blocking
void push(const E& element);
//! tries to pop an element from the queue if it is not empty and not done
//! the boolean value indicates the whether popping is successful, if the
//! queue is empty and not done then popping an element will result in
//! blocking
bool pop(E& element);
//! checks whether the ConcurrentQueue is empty
bool isEmpty() const;
//! returns the value of the variable done
bool getDone() const;
//! sets the variable d_done = true
void setDone();
//! clears the vector
void clear();
};
template <typename E>
void ConcurrentQueue<E>::push(const E& element) {
std::unique_lock<std::mutex> lk(d_lock);
//! concurrent queue is full so we wait until
//! it is not full
while (d_head + d_capacity == d_tail) {
d_notFull.wait(lk);
}
bool wasEmpty = (d_head == d_tail);
d_elements.at(d_tail % d_capacity) = element;
d_tail++;
//! if the concurrent queue was empty before
//! then it is not any more since we have "pushed" an element
//! thus we notify all the consumer threads
if (wasEmpty) {
d_notEmpty.notify_all();
}
}
template <typename E>
bool ConcurrentQueue<E>::pop(E& element) {
std::unique_lock<std::mutex> lk(d_lock);
//! concurrent queue is empty so we wait until
//! it is not empty
while (d_head == d_tail) {
if (d_done) {
return false;
}
d_notEmpty.wait(lk);
}
bool wasFull = (d_head + d_capacity == d_tail);
element = d_elements.at(d_head % d_capacity);
d_head++;
//! if the concurrent queue was full before
//! then it is not any more since we have "popped" an element
//! thus we notify all producer threads
if (wasFull) {
d_notFull.notify_all();
}
return true;
}
template <typename E>
bool ConcurrentQueue<E>::isEmpty() const {
std::unique_lock<std::mutex> lk(d_lock);
return (d_head == d_tail);
}
template <typename E>
bool ConcurrentQueue<E>::getDone() const {
std::unique_lock<std::mutex> lk(d_lock);
return d_done;
}
template <typename E>
void ConcurrentQueue<E>::setDone() {
std::unique_lock<std::mutex> lk(d_lock);
d_done = true;
d_notEmpty.notify_all();
}
template <typename E>
void ConcurrentQueue<E>::clear() {
std::unique_lock<std::mutex> lk(d_lock);
d_elements.clear();
}
} // namespace RDKit
#endif
#endif
|