File: queue.go

package info (click to toggle)
docker.io 26.1.5%2Bdfsg1-9
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 68,576 kB
  • sloc: sh: 5,748; makefile: 912; ansic: 664; asm: 228; python: 162
file content (158 lines) | stat: -rw-r--r-- 4,057 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package queue

import (
	"container/list"
	"fmt"
	"sync"

	"github.com/docker/go-events"
	"github.com/moby/swarmkit/v2/log"
)

// ErrQueueFull is returned by a Write operation when that Write causes the
// queue to reach its size limit.
var ErrQueueFull = fmt.Errorf("queue closed due to size limit")

// LimitQueue accepts all messages into a queue for asynchronous consumption by
// a sink until an upper limit of messages is reached. When that limit is
// reached, the entire Queue is Closed. It is thread safe but the
// sink must be reliable or events will be dropped.
// If a size of 0 is provided, the LimitQueue is considered limitless.
type LimitQueue struct {
	dst        events.Sink
	events     *list.List
	limit      uint64
	cond       *sync.Cond
	mu         sync.Mutex
	closed     bool
	full       chan struct{}
	fullClosed bool
}

// NewLimitQueue returns a queue to the provided Sink dst.
func NewLimitQueue(dst events.Sink, limit uint64) *LimitQueue {
	eq := LimitQueue{
		dst:    dst,
		events: list.New(),
		limit:  limit,
		full:   make(chan struct{}),
	}

	eq.cond = sync.NewCond(&eq.mu)
	go eq.run()
	return &eq
}

// Write accepts the events into the queue, only failing if the queue has
// been closed or has reached its size limit.
func (eq *LimitQueue) Write(event events.Event) error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return events.ErrSinkClosed
	}

	if eq.limit > 0 && uint64(eq.events.Len()) >= eq.limit {
		// If the limit has been reached, don't write the event to the queue,
		// and close the Full channel. This notifies listeners that the queue
		// is now full, but the sink is still permitted to consume events. It's
		// the responsibility of the listener to decide whether they want to
		// live with dropped events or whether they want to Close() the
		// LimitQueue
		if !eq.fullClosed {
			eq.fullClosed = true
			close(eq.full)
		}
		return ErrQueueFull
	}

	eq.events.PushBack(event)
	eq.cond.Signal() // signal waiters

	return nil
}

// Full returns a channel that is closed when the queue becomes full for the
// first time.
func (eq *LimitQueue) Full() chan struct{} {
	return eq.full
}

// Close shuts down the event queue, flushing all events
func (eq *LimitQueue) Close() error {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	if eq.closed {
		return nil
	}

	// set the closed flag
	eq.closed = true
	eq.cond.Signal() // signal flushes queue
	eq.cond.Wait()   // wait for signal from last flush
	return eq.dst.Close()
}

// run is the main goroutine to flush events to the target sink.
func (eq *LimitQueue) run() {
	for {
		event := eq.next()

		if event == nil {
			return // nil block means event queue is closed.
		}

		if err := eq.dst.Write(event); err != nil {
			// TODO(aaronl): Dropping events could be bad depending
			// on the application. We should have a way of
			// communicating this condition. However, logging
			// at a log level above debug may not be appropriate.
			// Eventually, go-events should not use logrus at all,
			// and should bubble up conditions like this through
			// error values.
			log.L.WithFields(log.Fields{
				"event": event,
				"sink":  eq.dst,
			}).WithError(err).Debug("eventqueue: dropped event")
		}
	}
}

// Len returns the number of items that are currently stored in the queue and
// not consumed by its sink.
func (eq *LimitQueue) Len() int {
	eq.mu.Lock()
	defer eq.mu.Unlock()
	return eq.events.Len()
}

func (eq *LimitQueue) String() string {
	eq.mu.Lock()
	defer eq.mu.Unlock()
	return fmt.Sprintf("%v", eq.events)
}

// next encompasses the critical section of the run loop. When the queue is
// empty, it will block on the condition. If new data arrives, it will wake
// and return a block. When closed, a nil slice will be returned.
func (eq *LimitQueue) next() events.Event {
	eq.mu.Lock()
	defer eq.mu.Unlock()

	for eq.events.Len() < 1 {
		if eq.closed {
			eq.cond.Broadcast()
			return nil
		}

		eq.cond.Wait()
	}

	front := eq.events.Front()
	block := front.Value.(events.Event)
	eq.events.Remove(front)

	return block
}