File: queue_test.go

package info (click to toggle)
docker.io 20.10.24%2Bdfsg1-1%2Bdeb12u1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 60,824 kB
  • sloc: sh: 5,621; makefile: 593; ansic: 179; python: 162; asm: 7
file content (176 lines) | stat: -rw-r--r-- 4,295 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package queue

import (
	"fmt"
	"sync"
	"testing"
	"time"

	"github.com/docker/go-events"
	"github.com/sirupsen/logrus"
	"github.com/stretchr/testify/require"
)

type mockSink struct {
	closed   bool
	holdChan chan struct{}
	data     []events.Event
	mutex    sync.Mutex
	once     sync.Once
}

func (s *mockSink) Write(event events.Event) error {
	<-s.holdChan

	s.mutex.Lock()
	defer s.mutex.Unlock()
	if s.closed {
		return events.ErrSinkClosed
	}
	s.data = append(s.data, event)
	return nil
}

func (s *mockSink) Close() error {
	s.mutex.Lock()
	defer s.mutex.Unlock()

	s.once.Do(func() {
		s.closed = true
		close(s.holdChan)
	})
	return nil
}

func (s *mockSink) Len() int {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return len(s.data)
}

func (s *mockSink) String() string {
	s.mutex.Lock()
	defer s.mutex.Unlock()
	return fmt.Sprintf("%v", s.data)
}

func TestLimitQueueNoLimit(t *testing.T) {
	require := require.New(t)
	ch := make(chan struct{})
	ms := &mockSink{
		holdChan: ch,
	}

	// Create a limit queue with no limit and store 10k events. The events
	// should be held in the queue until we unblock the sink.
	q := NewLimitQueue(ms, 0)
	defer q.Close()
	defer ms.Close()

	// Writing one event to the queue should block during the sink write phase
	require.NoError(q.Write("test event"))

	// Make sure the consumer goroutine receives the event
	deadline := time.Now().Add(5 * time.Second)
	for time.Now().Before(deadline) && q.Len() != 0 {
		time.Sleep(20 * time.Millisecond)
	}
	require.Equal(0, q.Len())
	require.Equal(0, ms.Len())

	for i := 0; i < 9999; i++ {
		require.NoError(q.Write("test event"))
	}
	require.Equal(9999, q.Len()) // 1 event blocked in the sink, 9999 waiting in the queue
	require.Equal(0, ms.Len())

	// Unblock the sink and expect all the events to have been flushed out of
	// the queue.
	for i := 0; i < 10000; i++ {
		ch <- struct{}{}
	}
	deadline = time.Now().Add(5 * time.Second)
	for time.Now().Before(deadline) && ms.Len() != 10000 {
		time.Sleep(20 * time.Millisecond)
	}

	require.Equal(0, q.Len())
	require.Equal(10000, ms.Len())
}

// TestLimitQueueWithLimit ensures that the limit queue works with a limit.
func TestLimitQueueWithLimit(t *testing.T) {
	require := require.New(t)
	ch := make(chan struct{})
	ms := &mockSink{
		holdChan: ch,
	}

	// Create a limit queue with no limit and store 10k events. The events should be held in
	// the queue until we unblock the sink.
	q := NewLimitQueue(ms, 10)
	defer q.Close()
	defer ms.Close()

	// Write the first event and wait for it to block on the writer
	require.NoError(q.Write("test event"))
	deadline := time.Now().Add(5 * time.Second)
	for time.Now().Before(deadline) && q.Len() != 0 {
		time.Sleep(20 * time.Millisecond)
	}
	require.Equal(0, ms.Len())
	require.Equal(0, q.Len())

	// Fill up the queue
	for i := 0; i < 10; i++ {
		require.NoError(q.Write("test event"))
	}
	require.Equal(0, ms.Len())
	require.Equal(10, q.Len())

	// Reading one event by the sink should allow us to write one more back
	// without closing the queue.
	ch <- struct{}{}
	deadline = time.Now().Add(5 * time.Second)
	for time.Now().Before(deadline) && q.Len() != 9 {
		time.Sleep(20 * time.Millisecond)
	}
	require.Equal(9, q.Len())
	require.Equal(1, ms.Len())
	require.NoError(q.Write("test event"))
	require.Equal(10, q.Len())
	require.Equal(1, ms.Len())

	// Trying to write a new event in the queue should flush it
	logrus.Debugf("Closing queue")
	err := q.Write("test event")
	require.Error(err)
	require.Equal(ErrQueueFull, err)
	require.Equal(10, q.Len())
	require.Equal(1, ms.Len())

	// Further writes should return the same error
	err = q.Write("test event")
	require.Error(err)
	require.Equal(ErrQueueFull, err)
	require.Equal(10, q.Len())
	require.Equal(1, ms.Len())

	// Reading one event from the sink will allow one more write to go through again
	ch <- struct{}{}
	deadline = time.Now().Add(5 * time.Second)
	for time.Now().Before(deadline) && q.Len() != 9 {
		time.Sleep(20 * time.Millisecond)
	}
	require.Equal(9, q.Len())
	require.Equal(2, ms.Len())
	require.NoError(q.Write("test event"))
	require.Equal(10, q.Len())
	require.Equal(2, ms.Len())

	err = q.Write("test event")
	require.Error(err)
	require.Equal(ErrQueueFull, err)
	require.Equal(10, q.Len())
	require.Equal(2, ms.Len())
}