File: queue.go

package info (click to toggle)
docker.io 28.5.2%2Bdfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 69,048 kB
  • sloc: sh: 5,867; makefile: 863; ansic: 184; python: 162; asm: 159
file content (156 lines) | stat: -rw-r--r-- 3,473 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package loggerutils

import (
	"context"
	"sync"

	"github.com/docker/docker/daemon/logger"
	"github.com/pkg/errors"
)

// MessageQueue is a queue for log messages.
//
// [MessageQueue.Enqueue] will block when the queue is full.
// To dequeue messages call [MessageQueue.Receiver] and pull messages off the
// returned channel.
//
// Closing only prevents new messages from being added to the queue.
// The queue can still be drained after close.
//
// The zero value of MessageQueue is safe to use, but does not do any internal
// buffering (queue size is 0).
type MessageQueue struct {
	maxSize int

	mu      sync.Mutex
	closing bool
	closed  chan struct{}

	// Blocks multiple calls to [MessageQueue.Close] until the queue is actually closed
	closeWait chan struct{}

	// We need to be able to safely close the send channel so that [MessageQueue.Dequeue]
	// can drain the queue without blocking.
	// This cond var helps deal with that.
	cond        *sync.Cond
	sendWaiters int

	ch chan *logger.Message
}

// NewMessageQueue creates a new queue with the specified size.
func NewMessageQueue(maxSize int) *MessageQueue {
	var q MessageQueue
	q.maxSize = maxSize
	q.init()
	return &q
}

func (q *MessageQueue) init() {
	if q.cond == nil {
		q.cond = sync.NewCond(&q.mu)
	}

	if q.ch == nil {
		q.ch = make(chan *logger.Message, q.maxSize)
	}

	if q.closed == nil {
		q.closed = make(chan struct{})
	}

	if q.closeWait == nil {
		q.closeWait = make(chan struct{})
	}
}

var ErrQueueClosed = errors.New("queue is closed")

// Enqueue adds the provided message to the queue.
// Enqueue blocks if the queue is full.
//
// The two possible error cases are:
// 1. The provided context is cancelled
// 2. [ErrQueueClosed] when the queue has been closed.
func (q *MessageQueue) Enqueue(ctx context.Context, m *logger.Message) error {
	q.mu.Lock()
	q.init()

	// Increment the waiter count
	// This prevents the send channel from being closed while we are trying to send.
	q.sendWaiters++
	q.mu.Unlock()

	defer func() {
		q.mu.Lock()
		// Decrement the waiter count and signal to any potential closer to check
		// the wait count again.
		// Only bother signaling if this is the last waiter.
		q.sendWaiters--
		if q.sendWaiters == 0 {
			q.cond.Signal()
		}
		q.mu.Unlock()
	}()

	// Before trying to send on the channel, check if we care closed.
	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-q.closed:
		return ErrQueueClosed
	default:
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	case <-q.closed:
		return ErrQueueClosed
	case q.ch <- m:
		return nil
	}
}

// Close prevents any new messages from being added to the queue.
func (q *MessageQueue) Close() {
	q.mu.Lock()

	q.init()

	if q.closing {
		// unlock the mutex here so that the goroutine waiting on the cond var can
		// take the lock when signaled.
		q.mu.Unlock()
		<-q.closeWait
		return
	}

	defer q.mu.Unlock()

	// Prevent multiple Close calls from trying to close things.
	q.closing = true

	close(q.closed)

	// Wait for any senders to finish
	// Because we closed the channel above, this shouldn't block for a long period.
	for q.sendWaiters > 0 {
		q.cond.Wait()
	}

	close(q.ch)
	close(q.closeWait)
}

// Receiver returns a channel that can be used to dequeue messages
// The channel will be closed when the message queue is closed but may have
// messages buffered.
func (q *MessageQueue) Receiver() <-chan *logger.Message {
	q.mu.Lock()
	defer q.mu.Unlock()

	q.init()

	return q.ch
}