1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
//go:build linux && cgo && !static_build && journald
package journald // import "github.com/docker/docker/daemon/logger/journald"
import (
"sync"
"testing"
"time"
"github.com/coreos/go-systemd/v22/journal"
"gotest.tools/v3/assert"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/journald/internal/fake"
"github.com/docker/docker/daemon/logger/loggertest"
)
func TestLogRead(t *testing.T) {
r := loggertest.Reader{
Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
journalDir := t.TempDir()
// Fill the journal with irrelevant events which the
// LogReader needs to filter out.
rotatedJournal := fake.NewT(t, journalDir+"/rotated.journal")
rotatedJournal.AssignEventTimestampFromSyslogTimestamp = true
l, err := new(logger.Info{
ContainerID: "wrongone0001",
ContainerName: "fake",
})
assert.NilError(t, err)
l.sendToJournal = rotatedJournal.Send
assert.NilError(t, l.Log(&logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stdout of a different container in a rotated journal file")}))
assert.NilError(t, l.Log(&logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stderr of a different container in a rotated journal file")}))
assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in a rotated journal", journal.PriInfo, nil))
activeJournal := fake.NewT(t, journalDir+"/fake.journal")
activeJournal.AssignEventTimestampFromSyslogTimestamp = true
l, err = new(logger.Info{
ContainerID: "wrongone0002",
ContainerName: "fake",
})
assert.NilError(t, err)
l.sendToJournal = activeJournal.Send
assert.NilError(t, l.Log(&logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stdout of a different container in the active journal file")}))
assert.NilError(t, l.Log(&logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stderr of a different container in the active journal file")}))
assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil))
return func(t *testing.T) logger.Logger {
l, err := new(info)
assert.NilError(t, err)
l.journalReadDir = journalDir
sl := &syncLogger{journald: l, waiters: map[uint64]chan<- struct{}{}}
s := make(chan sendit, 100)
t.Cleanup(func() { close(s) })
go func() {
for m := range s {
<-m.after
activeJournal.Send(m.message, m.priority, m.vars)
sl.mu.Lock()
sl.sent++
if notify, ok := sl.waiters[sl.sent]; ok {
delete(sl.waiters, sl.sent)
close(notify)
}
sl.mu.Unlock()
}
}()
l.sendToJournal = func(message string, priority journal.Priority, vars map[string]string) error {
sl.mu.Lock()
sl.queued++
sl.mu.Unlock()
s <- sendit{
message: message,
priority: priority,
vars: vars,
after: time.After(150 * time.Millisecond),
}
return nil
}
l.readSyncTimeout = 3 * time.Second
return sl
}
},
}
t.Run("Tail", r.TestTail)
t.Run("Follow", r.TestFollow)
}
type sendit struct {
message string
priority journal.Priority
vars map[string]string
after <-chan time.Time
}
type syncLogger struct {
*journald
mu sync.Mutex
queued, sent uint64
waiters map[uint64]chan<- struct{}
}
func (l *syncLogger) Sync() error {
l.mu.Lock()
waitFor := l.queued
if l.sent >= l.queued {
l.mu.Unlock()
return nil
}
notify := make(chan struct{})
l.waiters[waitFor] = notify
l.mu.Unlock()
<-notify
return nil
}
func (l *syncLogger) Close() error {
_ = l.Sync()
return l.journald.Close()
}
|