File: queue.go

package info (click to toggle)
golang-github-azure-go-amqp 1.0.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,192 kB
  • sloc: makefile: 22
file content (162 lines) | stat: -rw-r--r-- 3,692 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package queue

import (
	"container/ring"
)

// Holder provides synchronized access to a *Queue[T].
type Holder[T any] struct {
	// these channels work in tandem to provide exclusive access to the underlying *Queue[T].
	// each channel is created with a buffer size of one.
	// empty behaves like a mutex when there's one or more messages in the queue.
	// populated is like a semaphore when the queue is empty.
	// the *Queue[T] is only ever in one channel. which channel depends on if it contains any items.
	// the initial state is for empty to contain an empty queue.
	empty     chan *Queue[T]
	populated chan *Queue[T]
}

// NewHolder creates a new Holder[T] that contains the provided *Queue[T].
func NewHolder[T any](q *Queue[T]) *Holder[T] {
	h := &Holder[T]{
		empty:     make(chan *Queue[T], 1),
		populated: make(chan *Queue[T], 1),
	}
	h.Release(q)
	return h
}

// Acquire attempts to acquire the *Queue[T]. If the *Queue[T] has already been acquired the call blocks.
// When the *Queue[T] is no longer required, you MUST call Release() to relinquish acquisition.
func (h *Holder[T]) Acquire() *Queue[T] {
	// the queue will be in only one of the channels, it doesn't matter which one
	var q *Queue[T]
	select {
	case q = <-h.empty:
		// empty queue
	case q = <-h.populated:
		// populated queue
	}
	return q
}

// Wait returns a channel that's signaled when the *Queue[T] contains at least one item.
// When the *Queue[T] is no longer required, you MUST call Release() to relinquish acquisition.
func (h *Holder[T]) Wait() <-chan *Queue[T] {
	return h.populated
}

// Release returns the *Queue[T] back to the Holder[T].
// Once the *Queue[T] has been released, it is no longer safe to call its methods.
func (h *Holder[T]) Release(q *Queue[T]) {
	if q.Len() == 0 {
		h.empty <- q
	} else {
		h.populated <- q
	}
}

// Len returns the length of the *Queue[T].
func (h *Holder[T]) Len() int {
	msgLen := 0
	select {
	case q := <-h.empty:
		h.empty <- q
	case q := <-h.populated:
		msgLen = q.Len()
		h.populated <- q
	}
	return msgLen
}

// Queue[T] is a segmented FIFO queue of Ts.
type Queue[T any] struct {
	head *ring.Ring
	tail *ring.Ring
	size int
}

// New creates a new instance of Queue[T].
//   - size is the size of each Queue segment
func New[T any](size int) *Queue[T] {
	r := &ring.Ring{
		Value: &segment[T]{
			items: make([]*T, size),
		},
	}
	return &Queue[T]{
		head: r,
		tail: r,
	}
}

// Enqueue adds the specified item to the end of the queue.
// If the current segment is full, a new segment is created.
func (q *Queue[T]) Enqueue(item T) {
	for {
		r := q.tail
		seg := r.Value.(*segment[T])

		if seg.tail < len(seg.items) {
			seg.items[seg.tail] = &item
			seg.tail++
			q.size++
			return
		}

		// segment is full, can we advance?
		if next := r.Next(); next != q.head {
			q.tail = next
			continue
		}

		// no, add a new ring
		r.Link(&ring.Ring{
			Value: &segment[T]{
				items: make([]*T, len(seg.items)),
			},
		})

		q.tail = r.Next()
	}
}

// Dequeue removes and returns the item from the front of the queue.
func (q *Queue[T]) Dequeue() *T {
	r := q.head
	seg := r.Value.(*segment[T])

	if seg.tail == 0 {
		// queue is empty
		return nil
	}

	// remove first item
	item := seg.items[seg.head]
	seg.items[seg.head] = nil
	seg.head++
	q.size--

	if seg.head == seg.tail {
		// segment is now empty, reset indices
		seg.head, seg.tail = 0, 0

		// if we're not at the last ring, advance head to the next one
		if q.head != q.tail {
			q.head = r.Next()
		}
	}

	return item
}

// Len returns the total count of enqueued items.
func (q *Queue[T]) Len() int {
	return q.size
}

type segment[T any] struct {
	items []*T
	head  int
	tail  int
}