1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
package queue
import (
"fmt"
"sync"
"testing"
"time"
"github.com/docker/go-events"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)
type mockSink struct {
closed bool
holdChan chan struct{}
data []events.Event
mutex sync.Mutex
once sync.Once
}
func (s *mockSink) Write(event events.Event) error {
<-s.holdChan
s.mutex.Lock()
defer s.mutex.Unlock()
if s.closed {
return events.ErrSinkClosed
}
s.data = append(s.data, event)
return nil
}
func (s *mockSink) Close() error {
s.mutex.Lock()
defer s.mutex.Unlock()
s.once.Do(func() {
s.closed = true
close(s.holdChan)
})
return nil
}
func (s *mockSink) Len() int {
s.mutex.Lock()
defer s.mutex.Unlock()
return len(s.data)
}
func (s *mockSink) String() string {
s.mutex.Lock()
defer s.mutex.Unlock()
return fmt.Sprintf("%v", s.data)
}
func TestLimitQueueNoLimit(t *testing.T) {
require := require.New(t)
ch := make(chan struct{})
ms := &mockSink{
holdChan: ch,
}
// Create a limit queue with no limit and store 10k events. The events
// should be held in the queue until we unblock the sink.
q := NewLimitQueue(ms, 0)
defer q.Close()
defer ms.Close()
// Writing one event to the queue should block during the sink write phase
require.NoError(q.Write("test event"))
// Make sure the consumer goroutine receives the event
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 0 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(0, q.Len())
require.Equal(0, ms.Len())
for i := 0; i < 9999; i++ {
require.NoError(q.Write("test event"))
}
require.Equal(9999, q.Len()) // 1 event blocked in the sink, 9999 waiting in the queue
require.Equal(0, ms.Len())
// Unblock the sink and expect all the events to have been flushed out of
// the queue.
for i := 0; i < 10000; i++ {
ch <- struct{}{}
}
deadline = time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && ms.Len() != 10000 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(0, q.Len())
require.Equal(10000, ms.Len())
}
// TestLimitQueueWithLimit ensures that the limit queue works with a limit.
func TestLimitQueueWithLimit(t *testing.T) {
require := require.New(t)
ch := make(chan struct{})
ms := &mockSink{
holdChan: ch,
}
// Create a limit queue with no limit and store 10k events. The events should be held in
// the queue until we unblock the sink.
q := NewLimitQueue(ms, 10)
defer q.Close()
defer ms.Close()
// Write the first event and wait for it to block on the writer
require.NoError(q.Write("test event"))
deadline := time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 0 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(0, ms.Len())
require.Equal(0, q.Len())
// Fill up the queue
for i := 0; i < 10; i++ {
require.NoError(q.Write("test event"))
}
require.Equal(0, ms.Len())
require.Equal(10, q.Len())
// Reading one event by the sink should allow us to write one more back
// without closing the queue.
ch <- struct{}{}
deadline = time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 9 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(9, q.Len())
require.Equal(1, ms.Len())
require.NoError(q.Write("test event"))
require.Equal(10, q.Len())
require.Equal(1, ms.Len())
// Trying to write a new event in the queue should flush it
logrus.Debugf("Closing queue")
err := q.Write("test event")
require.Error(err)
require.Equal(ErrQueueFull, err)
require.Equal(10, q.Len())
require.Equal(1, ms.Len())
// Further writes should return the same error
err = q.Write("test event")
require.Error(err)
require.Equal(ErrQueueFull, err)
require.Equal(10, q.Len())
require.Equal(1, ms.Len())
// Reading one event from the sink will allow one more write to go through again
ch <- struct{}{}
deadline = time.Now().Add(5 * time.Second)
for time.Now().Before(deadline) && q.Len() != 9 {
time.Sleep(20 * time.Millisecond)
}
require.Equal(9, q.Len())
require.Equal(2, ms.Len())
require.NoError(q.Write("test event"))
require.Equal(10, q.Len())
require.Equal(2, ms.Len())
err = q.Write("test event")
require.Error(err)
require.Equal(ErrQueueFull, err)
require.Equal(10, q.Len())
require.Equal(2, ms.Len())
}
|