1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
package queue
import (
"container/ring"
)
// Holder provides synchronized access to a *Queue[T].
type Holder[T any] struct {
// these channels work in tandem to provide exclusive access to the underlying *Queue[T].
// each channel is created with a buffer size of one.
// empty behaves like a mutex when there's one or more messages in the queue.
// populated is like a semaphore when the queue is empty.
// the *Queue[T] is only ever in one channel. which channel depends on if it contains any items.
// the initial state is for empty to contain an empty queue.
empty chan *Queue[T]
populated chan *Queue[T]
}
// NewHolder creates a new Holder[T] that contains the provided *Queue[T].
func NewHolder[T any](q *Queue[T]) *Holder[T] {
h := &Holder[T]{
empty: make(chan *Queue[T], 1),
populated: make(chan *Queue[T], 1),
}
h.Release(q)
return h
}
// Acquire attempts to acquire the *Queue[T]. If the *Queue[T] has already been acquired the call blocks.
// When the *Queue[T] is no longer required, you MUST call Release() to relinquish acquisition.
func (h *Holder[T]) Acquire() *Queue[T] {
// the queue will be in only one of the channels, it doesn't matter which one
var q *Queue[T]
select {
case q = <-h.empty:
// empty queue
case q = <-h.populated:
// populated queue
}
return q
}
// Wait returns a channel that's signaled when the *Queue[T] contains at least one item.
// When the *Queue[T] is no longer required, you MUST call Release() to relinquish acquisition.
func (h *Holder[T]) Wait() <-chan *Queue[T] {
return h.populated
}
// Release returns the *Queue[T] back to the Holder[T].
// Once the *Queue[T] has been released, it is no longer safe to call its methods.
func (h *Holder[T]) Release(q *Queue[T]) {
if q.Len() == 0 {
h.empty <- q
} else {
h.populated <- q
}
}
// Len returns the length of the *Queue[T].
func (h *Holder[T]) Len() int {
msgLen := 0
select {
case q := <-h.empty:
h.empty <- q
case q := <-h.populated:
msgLen = q.Len()
h.populated <- q
}
return msgLen
}
// Queue[T] is a segmented FIFO queue of Ts.
type Queue[T any] struct {
head *ring.Ring
tail *ring.Ring
size int
}
// New creates a new instance of Queue[T].
// - size is the size of each Queue segment
func New[T any](size int) *Queue[T] {
r := &ring.Ring{
Value: &segment[T]{
items: make([]*T, size),
},
}
return &Queue[T]{
head: r,
tail: r,
}
}
// Enqueue adds the specified item to the end of the queue.
// If the current segment is full, a new segment is created.
func (q *Queue[T]) Enqueue(item T) {
for {
r := q.tail
seg := r.Value.(*segment[T])
if seg.tail < len(seg.items) {
seg.items[seg.tail] = &item
seg.tail++
q.size++
return
}
// segment is full, can we advance?
if next := r.Next(); next != q.head {
q.tail = next
continue
}
// no, add a new ring
r.Link(&ring.Ring{
Value: &segment[T]{
items: make([]*T, len(seg.items)),
},
})
q.tail = r.Next()
}
}
// Dequeue removes and returns the item from the front of the queue.
func (q *Queue[T]) Dequeue() *T {
r := q.head
seg := r.Value.(*segment[T])
if seg.tail == 0 {
// queue is empty
return nil
}
// remove first item
item := seg.items[seg.head]
seg.items[seg.head] = nil
seg.head++
q.size--
if seg.head == seg.tail {
// segment is now empty, reset indices
seg.head, seg.tail = 0, 0
// if we're not at the last ring, advance head to the next one
if q.head != q.tail {
q.head = r.Next()
}
}
return item
}
// Len returns the total count of enqueued items.
func (q *Queue[T]) Len() int {
return q.size
}
type segment[T any] struct {
items []*T
head int
tail int
}
|