1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
package loggerutils
import (
"context"
"sync"
"github.com/docker/docker/daemon/logger"
"github.com/pkg/errors"
)
// MessageQueue is a queue for log messages.
//
// [MessageQueue.Enqueue] will block when the queue is full.
// To dequeue messages call [MessageQueue.Receiver] and pull messages off the
// returned channel.
//
// Closing only prevents new messages from being added to the queue.
// The queue can still be drained after close.
//
// The zero value of MessageQueue is safe to use, but does not do any internal
// buffering (queue size is 0).
type MessageQueue struct {
maxSize int
mu sync.Mutex
closing bool
closed chan struct{}
// Blocks multiple calls to [MessageQueue.Close] until the queue is actually closed
closeWait chan struct{}
// We need to be able to safely close the send channel so that [MessageQueue.Dequeue]
// can drain the queue without blocking.
// This cond var helps deal with that.
cond *sync.Cond
sendWaiters int
ch chan *logger.Message
}
// NewMessageQueue creates a new queue with the specified size.
func NewMessageQueue(maxSize int) *MessageQueue {
var q MessageQueue
q.maxSize = maxSize
q.init()
return &q
}
func (q *MessageQueue) init() {
if q.cond == nil {
q.cond = sync.NewCond(&q.mu)
}
if q.ch == nil {
q.ch = make(chan *logger.Message, q.maxSize)
}
if q.closed == nil {
q.closed = make(chan struct{})
}
if q.closeWait == nil {
q.closeWait = make(chan struct{})
}
}
var ErrQueueClosed = errors.New("queue is closed")
// Enqueue adds the provided message to the queue.
// Enqueue blocks if the queue is full.
//
// The two possible error cases are:
// 1. The provided context is cancelled
// 2. [ErrQueueClosed] when the queue has been closed.
func (q *MessageQueue) Enqueue(ctx context.Context, m *logger.Message) error {
q.mu.Lock()
q.init()
// Increment the waiter count
// This prevents the send channel from being closed while we are trying to send.
q.sendWaiters++
q.mu.Unlock()
defer func() {
q.mu.Lock()
// Decrement the waiter count and signal to any potential closer to check
// the wait count again.
// Only bother signaling if this is the last waiter.
q.sendWaiters--
if q.sendWaiters == 0 {
q.cond.Signal()
}
q.mu.Unlock()
}()
// Before trying to send on the channel, check if we care closed.
select {
case <-ctx.Done():
return ctx.Err()
case <-q.closed:
return ErrQueueClosed
default:
}
select {
case <-ctx.Done():
return ctx.Err()
case <-q.closed:
return ErrQueueClosed
case q.ch <- m:
return nil
}
}
// Close prevents any new messages from being added to the queue.
func (q *MessageQueue) Close() {
q.mu.Lock()
q.init()
if q.closing {
// unlock the mutex here so that the goroutine waiting on the cond var can
// take the lock when signaled.
q.mu.Unlock()
<-q.closeWait
return
}
defer q.mu.Unlock()
// Prevent multiple Close calls from trying to close things.
q.closing = true
close(q.closed)
// Wait for any senders to finish
// Because we closed the channel above, this shouldn't block for a long period.
for q.sendWaiters > 0 {
q.cond.Wait()
}
close(q.ch)
close(q.closeWait)
}
// Receiver returns a channel that can be used to dequeue messages
// The channel will be closed when the message queue is closed but may have
// messages buffered.
func (q *MessageQueue) Receiver() <-chan *logger.Message {
q.mu.Lock()
defer q.mu.Unlock()
q.init()
return q.ch
}
|