File: driver_loki_batch.go

package info (click to toggle)
incus 6.0.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 24,392 kB
  • sloc: sh: 16,313; ansic: 3,121; python: 457; makefile: 337; ruby: 51; sql: 50; lisp: 6
file content (95 lines) | stat: -rw-r--r-- 2,091 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package logging

import (
	"encoding/json"
	"time"
)

// batch holds pending log streams waiting to be sent to Loki, and it's used
// to reduce the number of push requests to Loki aggregating multiple log streams
// and entries in a single batch request.
type batch struct {
	streams   map[string]*Stream
	bytes     int
	createdAt time.Time
}

func newBatch(entries ...entry) *batch {
	b := &batch{
		streams:   map[string]*Stream{},
		bytes:     0,
		createdAt: time.Now(),
	}

	// Add entries to the batch
	for _, entry := range entries {
		b.add(entry)
	}

	return b
}

// add an entry to the batch.
func (b *batch) add(entry entry) {
	b.bytes += len(entry.Line)

	// Append the entry to an already existing stream (if any)
	labels := entry.labels.String()

	stream, ok := b.streams[labels]
	if ok {
		stream.Entries = append(stream.Entries, entry.Entry)
		return
	}

	// Add the entry as a new stream
	b.streams[labels] = &Stream{
		Labels:  entry.labels,
		Entries: []Entry{entry.Entry},
	}
}

// sizeBytesAfter returns the size of the batch after the input entry
// will be added to the batch itself.
func (b *batch) sizeBytesAfter(entry entry) int {
	return b.bytes + len(entry.Line)
}

// age of the batch since its creation.
func (b *batch) age() time.Duration {
	return time.Since(b.createdAt)
}

// encode the batch as push request, and returns the encoded bytes and the number of encoded
// entries.
func (b *batch) encode() ([]byte, int, error) {
	req, entriesCount := b.createPushRequest()

	buf, err := json.Marshal(req)
	if err != nil {
		return nil, 0, err
	}

	return buf, entriesCount, nil
}

// creates push request and returns it, together with number of entries.
func (b *batch) createPushRequest() (*PushRequest, int) {
	req := PushRequest{
		Streams: make([]*Stream, 0, len(b.streams)),
	}

	entriesCount := 0

	for _, stream := range b.streams {
		req.Streams = append(req.Streams, stream)
		entriesCount += len(stream.Entries)
	}

	return &req, entriesCount
}

// empty returns true if streams is empty.
func (b *batch) empty() bool {
	return len(b.streams) == 0
}