File: read_test.go

package info (click to toggle)
docker.io 27.5.1%2Bdfsg4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 67,384 kB
  • sloc: sh: 5,847; makefile: 1,146; ansic: 664; python: 162; asm: 133
file content (124 lines) | stat: -rw-r--r-- 3,867 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//go:build linux && cgo && !static_build && journald

package journald // import "github.com/docker/docker/daemon/logger/journald"

import (
	"sync"
	"testing"
	"time"

	"github.com/coreos/go-systemd/v22/journal"
	"gotest.tools/v3/assert"

	"github.com/docker/docker/daemon/logger"
	"github.com/docker/docker/daemon/logger/journald/internal/fake"
	"github.com/docker/docker/daemon/logger/loggertest"
)

func TestLogRead(t *testing.T) {
	r := loggertest.Reader{
		Factory: func(t *testing.T, info logger.Info) func(*testing.T) logger.Logger {
			journalDir := t.TempDir()

			// Fill the journal with irrelevant events which the
			// LogReader needs to filter out.
			rotatedJournal := fake.NewT(t, journalDir+"/rotated.journal")
			rotatedJournal.AssignEventTimestampFromSyslogTimestamp = true
			l, err := new(logger.Info{
				ContainerID:   "wrongone0001",
				ContainerName: "fake",
			})
			assert.NilError(t, err)
			l.sendToJournal = rotatedJournal.Send
			assert.NilError(t, l.Log(&logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stdout of a different container in a rotated journal file")}))
			assert.NilError(t, l.Log(&logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stderr of a different container in a rotated journal file")}))
			assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in a rotated journal", journal.PriInfo, nil))

			activeJournal := fake.NewT(t, journalDir+"/fake.journal")
			activeJournal.AssignEventTimestampFromSyslogTimestamp = true
			l, err = new(logger.Info{
				ContainerID:   "wrongone0002",
				ContainerName: "fake",
			})
			assert.NilError(t, err)
			l.sendToJournal = activeJournal.Send
			assert.NilError(t, l.Log(&logger.Message{Source: "stdout", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stdout of a different container in the active journal file")}))
			assert.NilError(t, l.Log(&logger.Message{Source: "stderr", Timestamp: time.Now().Add(-1 * 30 * time.Minute), Line: []byte("stderr of a different container in the active journal file")}))
			assert.NilError(t, rotatedJournal.Send("a log message from a totally different process in the active journal", journal.PriInfo, nil))

			return func(t *testing.T) logger.Logger {
				l, err := new(info)
				assert.NilError(t, err)
				l.journalReadDir = journalDir
				sl := &syncLogger{journald: l, waiters: map[uint64]chan<- struct{}{}}

				s := make(chan sendit, 100)
				t.Cleanup(func() { close(s) })
				go func() {
					for m := range s {
						<-m.after
						activeJournal.Send(m.message, m.priority, m.vars)
						sl.mu.Lock()
						sl.sent++
						if notify, ok := sl.waiters[sl.sent]; ok {
							delete(sl.waiters, sl.sent)
							close(notify)
						}
						sl.mu.Unlock()
					}
				}()

				l.sendToJournal = func(message string, priority journal.Priority, vars map[string]string) error {
					sl.mu.Lock()
					sl.queued++
					sl.mu.Unlock()
					s <- sendit{
						message:  message,
						priority: priority,
						vars:     vars,
						after:    time.After(150 * time.Millisecond),
					}
					return nil
				}
				l.readSyncTimeout = 3 * time.Second
				return sl
			}
		},
	}
	t.Run("Tail", r.TestTail)
	t.Run("Follow", r.TestFollow)
}

type sendit struct {
	message  string
	priority journal.Priority
	vars     map[string]string
	after    <-chan time.Time
}

type syncLogger struct {
	*journald

	mu           sync.Mutex
	queued, sent uint64
	waiters      map[uint64]chan<- struct{}
}

func (l *syncLogger) Sync() error {
	l.mu.Lock()
	waitFor := l.queued
	if l.sent >= l.queued {
		l.mu.Unlock()
		return nil
	}
	notify := make(chan struct{})
	l.waiters[waitFor] = notify
	l.mu.Unlock()
	<-notify
	return nil
}

func (l *syncLogger) Close() error {
	_ = l.Sync()
	return l.journald.Close()
}