1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
package loggerutils
import (
"context"
"testing"
"time"
"github.com/docker/docker/daemon/logger"
"gotest.tools/v3/assert"
)
func TestQueue(t *testing.T) {
q := NewMessageQueue(2)
msg := &logger.Message{Line: []byte("hello")}
ctx := context.Background()
err := q.Enqueue(ctx, msg)
assert.Check(t, err)
recv := q.Receiver()
// These pointer values should be the same
assert.Equal(t, msg, <-recv)
err = q.Enqueue(ctx, msg)
assert.Check(t, err)
err = q.Enqueue(ctx, msg)
assert.Check(t, err)
q.Close()
// We have 2 messages in the queue
// Even though this is closed, we should get a true value from dequeue twice.
assert.Equal(t, msg, <-recv)
assert.Equal(t, msg, <-recv)
// This should not block and should return false
_, more := <-recv
assert.Check(t, !more, "expected no more messages in the queue")
// Test with unbuffered
q = &MessageQueue{}
recv = q.Receiver()
chAdd := make(chan error, 1)
go func() {
chAdd <- q.Enqueue(ctx, msg)
}()
assert.Equal(t, msg, <-recv)
assert.Assert(t, <-chAdd)
ctxC, cancel := context.WithCancel(ctx)
cancel()
err = q.Enqueue(ctxC, msg)
assert.ErrorIs(t, err, context.Canceled)
// Test that blocked senders do not cause a panic on close.
// This check is useful because the underlying implementation uses channels
// with the send channel eventually getting closed when q.Close is called.
go func() {
chAdd <- q.Enqueue(ctx, msg)
}()
// Wait for enqueue to be ready (or as close to ready as it can be)
for {
q.mu.Lock()
if q.sendWaiters == 1 {
q.mu.Unlock()
break
}
q.mu.Unlock()
time.Sleep(time.Millisecond)
}
q.Close()
select {
case <-time.After(5 * time.Second):
case err := <-chAdd:
assert.ErrorIs(t, err, ErrQueueClosed)
}
// Double-close should not cause any issues
q.Close()
}
|