1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
package queue
import (
"container/list"
"fmt"
"sync"
"github.com/docker/go-events"
"github.com/moby/swarmkit/v2/log"
)
// ErrQueueFull is returned by a Write operation when that Write causes the
// queue to reach its size limit.
var ErrQueueFull = fmt.Errorf("queue closed due to size limit")
// LimitQueue accepts all messages into a queue for asynchronous consumption by
// a sink until an upper limit of messages is reached. When that limit is
// reached, the entire Queue is Closed. It is thread safe but the
// sink must be reliable or events will be dropped.
// If a size of 0 is provided, the LimitQueue is considered limitless.
type LimitQueue struct {
dst events.Sink
events *list.List
limit uint64
cond *sync.Cond
mu sync.Mutex
closed bool
full chan struct{}
fullClosed bool
}
// NewLimitQueue returns a queue to the provided Sink dst.
func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue {
eq := LimitQueue{
dst: dst,
events: list.New(),
limit: limit,
full: make(chan struct{}),
}
eq.cond = sync.NewCond(&eq.mu)
go eq.run()
return &eq
}
// Write accepts the events into the queue, only failing if the queue has
// been closed or has reached its size limit.
func (eq *LimitQueue) Write(event events.Event) error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return events.ErrSinkClosed
}
if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
// If the limit has been reached, don't write the event to the queue,
// and close the Full channel. This notifies listeners that the queue
// is now full, but the sink is still permitted to consume events. It's
// the responsibility of the listener to decide whether they want to
// live with dropped events or whether they want to Close() the
// LimitQueue
if !eq.fullClosed {
eq.fullClosed = true
close(eq.full)
}
return ErrQueueFull
}
eq.events.PushBack(event)
eq.cond.Signal() // signal waiters
return nil
}
// Full returns a channel that is closed when the queue becomes full for the
// first time.
func (eq *LimitQueue) Full() chan struct{} {
return eq.full
}
// Close shuts down the event queue, flushing all events
func (eq *LimitQueue) Close() error {
eq.mu.Lock()
defer eq.mu.Unlock()
if eq.closed {
return nil
}
// set the closed flag
eq.closed = true
eq.cond.Signal() // signal flushes queue
eq.cond.Wait() // wait for signal from last flush
return eq.dst.Close()
}
// run is the main goroutine to flush events to the target sink.
func (eq *LimitQueue) run() {
for {
event := eq.next()
if event == nil {
return // nil block means event queue is closed.
}
if err := eq.dst.Write(event); err != nil {
// TODO(aaronl): Dropping events could be bad depending
// on the application. We should have a way of
// communicating this condition. However, logging
// at a log level above debug may not be appropriate.
// Eventually, go-events should not use logrus at all,
// and should bubble up conditions like this through
// error values.
log.L.WithFields(log.Fields{
"event": event,
"sink": eq.dst,
}).WithError(err).Debug("eventqueue: dropped event")
}
}
}
// Len returns the number of items that are currently stored in the queue and
// not consumed by its sink.
func (eq *LimitQueue) Len() int {
eq.mu.Lock()
defer eq.mu.Unlock()
return eq.events.Len()
}
func (eq *LimitQueue) String() string {
eq.mu.Lock()
defer eq.mu.Unlock()
return fmt.Sprintf("%v", eq.events)
}
// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *LimitQueue) next() events.Event {
eq.mu.Lock()
defer eq.mu.Unlock()
for eq.events.Len() < 1 {
if eq.closed {
eq.cond.Broadcast()
return nil
}
eq.cond.Wait()
}
front := eq.events.Front()
block := front.Value.(events.Event)
eq.events.Remove(front)
return block
}
|