File: spscqueue.h

package info (click to toggle)
readerwriterqueue 1.0.3-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 388 kB
  • sloc: cpp: 2,876; makefile: 79
file content (139 lines) | stat: -rw-r--r-- 3,495 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include "../../../atomicops.h"
#include <cstdlib>		// For std::size_t

// From http://www.1024cores.net/home/lock-free-algorithms/queues/unbounded-spsc-queue
// (and http://software.intel.com/en-us/articles/single-producer-single-consumer-queue)

// load with 'consume' (data-dependent) memory ordering 
template<typename T> 
T load_consume(T const* addr) 
{ 
    // hardware fence is implicit on x86 
    T v = *const_cast<T const volatile*>(addr); 
    moodycamel::compiler_fence(moodycamel::memory_order_seq_cst);
    return v; 
} 

// store with 'release' memory ordering 
template<typename T> 
void store_release(T* addr, T v) 
{ 
    // hardware fence is implicit on x86 
    moodycamel::compiler_fence(moodycamel::memory_order_seq_cst);
    *const_cast<T volatile*>(addr) = v; 
} 

// cache line size on modern x86 processors (in bytes) 
size_t const cache_line_size = 64; 
// single-producer/single-consumer queue 
template<typename T> 
class spsc_queue 
{ 
public: 
  spsc_queue() 
  { 
      node* n = new node; 
      n->next_ = 0; 
      tail_ = head_ = first_= tail_copy_ = n; 
  }

  explicit spsc_queue(size_t prealloc)
  {
      node* n = new node;
      n->next_ = 0;
      tail_ = head_ = first_ = tail_copy_ = n;

      // [CD] Not (at all) the most efficient way to pre-allocate memory, but it works
      T dummy = T();
      for (size_t i = 0; i != prealloc; ++i) {
          enqueue(dummy);
      }
      for (size_t i = 0; i != prealloc; ++i) {
          try_dequeue(dummy);
      }
  }

  ~spsc_queue() 
  { 
      node* n = first_; 
      do 
      { 
          node* next = n->next_; 
          delete n; 
          n = next; 
      } 
      while (n); 
  } 

  void enqueue(T v) 
  { 
      node* n = alloc_node(); 
      n->next_ = 0; 
      n->value_ = v; 
      store_release(&head_->next_, n); 
      head_ = n; 
  } 

  // returns 'false' if queue is empty 
  bool try_dequeue(T& v) 
  { 
      if (load_consume(&tail_->next_)) 
      { 
          v = tail_->next_->value_; 
          store_release(&tail_, tail_->next_); 
          return true; 
      } 
      else 
      { 
          return false; 
      } 
  } 

private: 
  // internal node structure 
  struct node 
  { 
      node* next_; 
      T value_; 
  }; 

  // consumer part 
  // accessed mainly by consumer, infrequently be producer 
  node* tail_; // tail of the queue 

  // delimiter between consumer part and producer part, 
  // so that they situated on different cache lines 
  char cache_line_pad_ [cache_line_size]; 

  // producer part 
  // accessed only by producer 
  node* head_; // head of the queue 
  node* first_; // last unused node (tail of node cache) 
  node* tail_copy_; // helper (points somewhere between first_ and tail_) 

  node* alloc_node() 
  { 
      // first tries to allocate node from internal node cache, 
      // if attempt fails, allocates node via ::operator new() 

      if (first_ != tail_copy_) 
      { 
          node* n = first_; 
          first_ = first_->next_; 
          return n; 
      } 
      tail_copy_ = load_consume(&tail_); 
      if (first_ != tail_copy_) 
      { 
          node* n = first_; 
          first_ = first_->next_; 
          return n; 
      } 
      node* n = new node; 
      return n; 
  } 

  spsc_queue(spsc_queue const&); 
  spsc_queue& operator = (spsc_queue const&); 

};