1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
package queue
import (
"sync"
)
// Queue is an unbounded queue of []byte.
// The queue is goroutine safe.
// Inspired by http://blog.dubbelboer.com/2015/04/25/go-faster-queue.html (MIT)
type Queue interface {
// Add an []byte to the back of the queue
// will return false if the queue is closed.
// In that case the []byte is dropped.
Add([]byte) bool
// Remove will remove a []byte from the queue.
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
Remove() ([]byte, bool)
// Close the queue and discard all entries in the queue
// all goroutines in wait() will return
Close()
// CloseRemaining will close the queue and return all entries in the queue.
// All goroutines in wait() will return
CloseRemaining() [][]byte
// Closed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
Closed() bool
// Wait for a []byte to be added or queue to be closed.
// If there is items on the queue the first will
// be returned immediately.
// Will return "", false if the queue is closed.
// Otherwise the return value of "remove" is returned.
Wait() bool
// Cap returns the capacity (without allocations).
Cap() int
// Len returns the current length of the queue.
Len() int
// Size returns the current size of the queue in bytes.
Size() int
}
type byteQueue struct {
mu sync.RWMutex
cond *sync.Cond
nodes [][]byte
head int
tail int
cnt int
size int
closed bool
initCap int
}
var initialCapacity = 2
// New ByteQueue returns a new []byte queue with initial capacity.
func New() Queue {
sq := &byteQueue{
initCap: initialCapacity,
nodes: make([][]byte, initialCapacity),
}
sq.cond = sync.NewCond(&sq.mu)
return sq
}
// Write mutex must be held when calling
func (q *byteQueue) resize(n int) {
nodes := make([][]byte, n)
if q.head < q.tail {
copy(nodes, q.nodes[q.head:q.tail])
} else {
copy(nodes, q.nodes[q.head:])
copy(nodes[len(q.nodes)-q.head:], q.nodes[:q.tail])
}
q.tail = q.cnt % n
q.head = 0
q.nodes = nodes
}
// Add a []byte to the back of the queue
// will return false if the queue is closed.
// In that case the []byte is dropped.
func (q *byteQueue) Add(i []byte) bool {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
return false
}
if q.cnt == len(q.nodes) {
// Also tested a grow rate of 1.5, see: http://stackoverflow.com/questions/2269063/buffer-growth-strategy
// In Go this resulted in a higher memory usage.
q.resize(q.cnt * 2)
}
q.nodes[q.tail] = i
q.tail = (q.tail + 1) % len(q.nodes)
q.size += len(i)
q.cnt++
q.cond.Signal()
q.mu.Unlock()
return true
}
// Close the queue and discard all entries in the queue
// all goroutines in wait() will return
func (q *byteQueue) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.cnt = 0
q.nodes = nil
q.size = 0
q.cond.Broadcast()
}
// CloseRemaining will close the queue and return all entries in the queue.
// All goroutines in wait() will return.
func (q *byteQueue) CloseRemaining() [][]byte {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return [][]byte{}
}
rem := make([][]byte, 0, q.cnt)
for q.cnt > 0 {
i := q.nodes[q.head]
q.head = (q.head + 1) % len(q.nodes)
q.cnt--
rem = append(rem, i)
}
q.closed = true
q.cnt = 0
q.nodes = nil
q.size = 0
q.cond.Broadcast()
return rem
}
// Closed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
func (q *byteQueue) Closed() bool {
q.mu.RLock()
c := q.closed
q.mu.RUnlock()
return c
}
// Wait for a message to be added.
// If there are items on the queue will return immediately.
// Will return false if the queue is closed.
// Otherwise returns true.
func (q *byteQueue) Wait() bool {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
return false
}
if q.cnt != 0 {
q.mu.Unlock()
return true
}
q.cond.Wait()
q.mu.Unlock()
return true
}
// Remove will remove a []byte from the queue.
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
func (q *byteQueue) Remove() ([]byte, bool) {
q.mu.Lock()
if q.cnt == 0 {
q.mu.Unlock()
return nil, false
}
i := q.nodes[q.head]
q.head = (q.head + 1) % len(q.nodes)
q.cnt--
q.size -= len(i)
if n := len(q.nodes) / 2; n >= q.initCap && q.cnt <= n {
q.resize(n)
}
q.mu.Unlock()
return i, true
}
// Cap returns the capacity (without allocations)
func (q *byteQueue) Cap() int {
q.mu.RLock()
c := cap(q.nodes)
q.mu.RUnlock()
return c
}
// Len returns the current length of the queue.
func (q *byteQueue) Len() int {
q.mu.RLock()
l := q.cnt
q.mu.RUnlock()
return l
}
// Size returns the current size of the queue.
func (q *byteQueue) Size() int {
q.mu.RLock()
s := q.size
q.mu.RUnlock()
return s
}
|