1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
#pragma once
#include "Sync.h"
#include <stddef.h>
#include <atomic>
#include <array>
#include <memory>
namespace vst {
template<typename T, size_t N>
class LockfreeFifo {
public:
LockfreeFifo() = default;
LockfreeFifo(const LockfreeFifo&) = delete;
LockfreeFifo& operator=(const LockfreeFifo&) = delete;
bool push(const T& data){
return emplace(data);
}
template<typename... TArgs>
bool emplace(TArgs&&... args){
int next = (writeHead_.load(std::memory_order_relaxed) + 1) % N;
if (next == readHead_.load(std::memory_order_acquire)){
return false; // FIFO is full
}
data_[next] = T { std::forward<TArgs>(args)... };
writeHead_.store(next, std::memory_order_release);
return true;
}
bool pop(T& data){
int pos = readHead_.load(std::memory_order_relaxed);
if (pos == writeHead_.load(std::memory_order_acquire)) {
return false; // FIFO is empty
}
int next = (pos + 1) % N;
data = data_[next];
readHead_.store(next, std::memory_order_release);
return true;
}
void clear() {
readHead_.store(writeHead_.load());
}
bool empty() const {
return readHead_.load(std::memory_order_relaxed) == writeHead_.load(std::memory_order_relaxed);
}
size_t capacity() const { return N; }
// raw data
int readPos() const { return readHead_.load(std::memory_order_relaxed); }
int writePos() const { return writeHead_.load(std::memory_order_relaxed); }
T * data() { return data_.data(); }
const T* data() const { return data_.data(); }
private:
std::atomic<int> readHead_{0};
std::atomic<int> writeHead_{0};
std::array<T, N> data_;
};
template<typename T>
struct Node {
template<typename... U>
Node(U&&... args)
: next_(nullptr), data_(std::forward<U>(args)...) {}
Node * next_;
T data_;
};
// special MPSC queue implementation that can be safely created in a RT context.
// the required dummy node is a class member and therefore doesn't have to be allocated
// dynamically in the constructor. As a consequence, we need to be extra careful when
// freeing the nodes in the destructor (we must not delete the dummy node!)
// Multiple producers are synchronized with a simple spin lock.
// NB: the free list *could* be atomic, but we would need to be extra careful to avoid
// the ABA problem. (During a CAS loop the current node could be popped and pushed again,
// so that the CAS would succeed even though the object has changed.)
template<typename T, typename Alloc = std::allocator<T>>
class UnboundedMPSCQueue : protected std::allocator_traits<Alloc>::template rebind_alloc<Node<T>> {
typedef typename std::allocator_traits<Alloc>::template rebind_alloc<Node<T>> Base;
public:
UnboundedMPSCQueue(const Alloc& alloc = Alloc {}) : Base(alloc) {
// add dummy node
first_ = devider_ = last_ = &dummy_;
}
UnboundedMPSCQueue(const UnboundedMPSCQueue&) = delete;
UnboundedMPSCQueue& operator=(const UnboundedMPSCQueue&) = delete;
~UnboundedMPSCQueue(){
if (needRelease()) {
freeMemory();
}
}
// not thread-safe!
void reserve(size_t n){
// check for existing empty nodes
auto it = first_;
auto end = devider_.load();
while (it != end){
n--;
it = it->next_;
}
// add empty nodes
while (n--){
auto node = Base::allocate(1);
new (node) Node<T>();
node->next_ = first_;
first_ = node;
}
}
void push(const T& data){
emplace(data);
}
template<typename... TArgs>
void emplace(TArgs&&... args){
Node<T>* node = nullptr;
{
// try to reuse existing node
std::lock_guard lock(lock_);
if (first_ != devider_.load(std::memory_order_acquire)) {
node = first_;
first_ = first_->next_;
node->next_ = nullptr; // !
}
}
if (!node) {
// allocate new node
node = Base::allocate(1);
new (node) Node<T>();
}
node->data_ = T{std::forward<TArgs>(args)...};
// push node
std::lock_guard lock(lock_);
auto last = last_.load(std::memory_order_relaxed);
last->next_ = node;
last_.store(node, std::memory_order_release); // publish
}
bool pop(T& result){
if (!empty()) {
// use node *after* devider, because devider is always a dummy!
auto next = devider_.load(std::memory_order_relaxed)->next_;
result = std::move(next->data_);
devider_.store(next, std::memory_order_release); // publish
return true;
} else {
return false;
}
}
bool empty() const {
return devider_.load(std::memory_order_relaxed)
== last_.load(std::memory_order_acquire);
}
void clear(){
devider_.store(last_);
}
// not thread-safe!
template<typename Func>
void forEach(Func&& fn) {
auto it = devider_.load(std::memory_order_relaxed)->next_;
while (it) {
fn(it->data_);
it = it->next_;
}
}
void release() {
freeMemory();
first_ = devider_ = last_ = &dummy_;
dummy_.next_ = nullptr; // !
}
bool needRelease() const {
return first_ != last_.load(std::memory_order_relaxed);
}
private:
Node<T>* first_;
std::atomic<Node<T> *> devider_;
std::atomic<Node<T> *> last_;
SpinLock lock_;
Node<T> dummy_; // optimization
void freeMemory() {
// only frees memory, doesn't reset pointers!
auto it = first_;
while (it){
auto next = it->next_;
if (it != &dummy_) {
it->~Node<T>();
Base::deallocate(it, 1);
}
it = next;
}
}
};
} // vst
|