1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
#include "../../../atomicops.h"
#include <cstdlib> // For std::size_t
// From http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
// (and http://software.intel.com/en-us/articles/single-producer-single-consumer-queue)
// load with 'consume' (data-dependent) memory ordering
template<typename T>
T load_consume(T const* addr)
{
// hardware fence is implicit on x86
T v = *const_cast<T const volatile*>(addr);
moodycamel::compiler_fence(moodycamel::memory_order_seq_cst);
return v;
}
// store with 'release' memory ordering
template<typename T>
void store_release(T* addr, T v)
{
// hardware fence is implicit on x86
moodycamel::compiler_fence(moodycamel::memory_order_seq_cst);
*const_cast<T volatile*>(addr) = v;
}
// cache line size on modern x86 processors (in bytes)
size_t const cache_line_size = 64;
// single-producer/single-consumer queue
template<typename T>
class spsc_queue
{
public:
spsc_queue()
{
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_= tail_copy_ = n;
}
explicit spsc_queue(size_t prealloc)
{
node* n = new node;
n->next_ = 0;
tail_ = head_ = first_ = tail_copy_ = n;
// [CD] Not (at all) the most efficient way to pre-allocate memory, but it works
T dummy = T();
for (size_t i = 0; i != prealloc; ++i) {
enqueue(dummy);
}
for (size_t i = 0; i != prealloc; ++i) {
try_dequeue(dummy);
}
}
~spsc_queue()
{
node* n = first_;
do
{
node* next = n->next_;
delete n;
n = next;
}
while (n);
}
void enqueue(T v)
{
node* n = alloc_node();
n->next_ = 0;
n->value_ = v;
store_release(&head_->next_, n);
head_ = n;
}
// returns 'false' if queue is empty
bool try_dequeue(T& v)
{
if (load_consume(&tail_->next_))
{
v = tail_->next_->value_;
store_release(&tail_, tail_->next_);
return true;
}
else
{
return false;
}
}
private:
// internal node structure
struct node
{
node* next_;
T value_;
};
// consumer part
// accessed mainly by consumer, infrequently be producer
node* tail_; // tail of the queue
// delimiter between consumer part and producer part,
// so that they situated on different cache lines
char cache_line_pad_ [cache_line_size];
// producer part
// accessed only by producer
node* head_; // head of the queue
node* first_; // last unused node (tail of node cache)
node* tail_copy_; // helper (points somewhere between first_ and tail_)
node* alloc_node()
{
// first tries to allocate node from internal node cache,
// if attempt fails, allocates node via ::operator new()
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
tail_copy_ = load_consume(&tail_);
if (first_ != tail_copy_)
{
node* n = first_;
first_ = first_->next_;
return n;
}
node* n = new node;
return n;
}
spsc_queue(spsc_queue const&);
spsc_queue& operator = (spsc_queue const&);
};
|