1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
package dissolve
import (
"sync"
)
// Job to do.
type Job func() error
// queue is an unbounded queue of Job.
// The queue is goroutine safe.
// Inspired by http://blog.dubbelboer.com/2015/04/25/go-faster-queue.html (MIT)
type queue interface {
// Add an Job to the back of the queue
// will return false if the queue is closed.
// In that case the Job is dropped.
Add(Job) bool
// Remove will remove a Job from the queue.
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
Remove() (Job, bool)
// Close the queue and discard all entries in the queue
// all goroutines in wait() will return
Close()
// Closed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
Closed() bool
// Wait for a Job to be added or queue to be closed.
// If there is items on the queue the first will
// be returned immediately.
// Will return "", false if the queue is closed.
// Otherwise the return value of "remove" is returned.
Wait() (Job, bool)
}
type queueImpl struct {
mu sync.RWMutex
cond *sync.Cond
nodes []Job
head int
tail int
cnt int
size int
closed bool
initCap int
}
var initialCapacity = 2
// newQueue returns a new Job queue with initial capacity.
func newQueue() queue {
sq := &queueImpl{
initCap: initialCapacity,
nodes: make([]Job, initialCapacity),
}
sq.cond = sync.NewCond(&sq.mu)
return sq
}
// Write mutex must be held when calling
func (q *queueImpl) resize(n int) {
nodes := make([]Job, n)
if q.head < q.tail {
copy(nodes, q.nodes[q.head:q.tail])
} else {
copy(nodes, q.nodes[q.head:])
copy(nodes[len(q.nodes)-q.head:], q.nodes[:q.tail])
}
q.tail = q.cnt % n
q.head = 0
q.nodes = nodes
}
// Add a Job to the back of the queue
// will return false if the queue is closed.
// In that case the Job is dropped.
func (q *queueImpl) Add(i Job) bool {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
return false
}
if q.cnt == len(q.nodes) {
// Also tested a grow rate of 1.5, see: http://stackoverflow.com/questions/2269063/buffer-growth-strategy
// In Go this resulted in a higher memory usage.
q.resize(q.cnt * 2)
}
q.nodes[q.tail] = i
q.tail = (q.tail + 1) % len(q.nodes)
q.cnt++
q.cond.Signal()
q.mu.Unlock()
return true
}
// Close the queue and discard all entries in the queue
// all goroutines in wait() will return
func (q *queueImpl) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.cnt = 0
q.nodes = nil
q.size = 0
q.cond.Broadcast()
}
// Closed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
func (q *queueImpl) Closed() bool {
q.mu.RLock()
c := q.closed
q.mu.RUnlock()
return c
}
// Wait for a Job to be added.
// If there is items on the queue the first will
// be returned immediately.
// Will return nil, false if the queue is closed.
// Otherwise the return value of "remove" is returned.
func (q *queueImpl) Wait() (Job, bool) {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
return nil, false
}
if q.cnt != 0 {
q.mu.Unlock()
return q.Remove()
}
q.cond.Wait()
q.mu.Unlock()
return q.Remove()
}
// Remove will remove a Job from the queue.
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
func (q *queueImpl) Remove() (Job, bool) {
q.mu.Lock()
if q.cnt == 0 {
q.mu.Unlock()
return nil, false
}
i := q.nodes[q.head]
q.head = (q.head + 1) % len(q.nodes)
q.cnt--
if n := len(q.nodes) / 2; n >= q.initCap && q.cnt <= n {
q.resize(n)
}
q.mu.Unlock()
return i, true
}
|