1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
|
// Copyright 2020 New Relic Corporation. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package internal
import (
"bytes"
"container/heap"
"github.com/newrelic/go-agent/internal/jsonx"
)
type analyticsEvent struct {
priority Priority
jsonWriter
}
type analyticsEventHeap []analyticsEvent
type analyticsEvents struct {
numSeen int
events analyticsEventHeap
failedHarvests int
}
func (events *analyticsEvents) NumSeen() float64 { return float64(events.numSeen) }
func (events *analyticsEvents) NumSaved() float64 { return float64(len(events.events)) }
func (h analyticsEventHeap) Len() int { return len(h) }
func (h analyticsEventHeap) Less(i, j int) bool { return h[i].priority.isLowerPriority(h[j].priority) }
func (h analyticsEventHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
// Push and Pop are unused: only heap.Init and heap.Fix are used.
func (h analyticsEventHeap) Push(x interface{}) {}
func (h analyticsEventHeap) Pop() interface{} { return nil }
func newAnalyticsEvents(max int) *analyticsEvents {
return &analyticsEvents{
numSeen: 0,
events: make(analyticsEventHeap, 0, max),
failedHarvests: 0,
}
}
func (events *analyticsEvents) capacity() int {
return cap(events.events)
}
func (events *analyticsEvents) addEvent(e analyticsEvent) {
events.numSeen++
if events.capacity() == 0 {
// Configurable event harvest limits may be zero.
return
}
if len(events.events) < cap(events.events) {
events.events = append(events.events, e)
if len(events.events) == cap(events.events) {
// Delay heap initialization so that we can have
// deterministic ordering for integration tests (the max
// is not being reached).
heap.Init(events.events)
}
return
}
if e.priority.isLowerPriority((events.events)[0].priority) {
return
}
events.events[0] = e
heap.Fix(events.events, 0)
}
func (events *analyticsEvents) mergeFailed(other *analyticsEvents) {
fails := other.failedHarvests + 1
if fails >= failedEventsAttemptsLimit {
return
}
events.failedHarvests = fails
events.Merge(other)
}
func (events *analyticsEvents) Merge(other *analyticsEvents) {
allSeen := events.numSeen + other.numSeen
for _, e := range other.events {
events.addEvent(e)
}
events.numSeen = allSeen
}
func (events *analyticsEvents) CollectorJSON(agentRunID string) ([]byte, error) {
if 0 == len(events.events) {
return nil, nil
}
estimate := 256 * len(events.events)
buf := bytes.NewBuffer(make([]byte, 0, estimate))
buf.WriteByte('[')
jsonx.AppendString(buf, agentRunID)
buf.WriteByte(',')
buf.WriteByte('{')
buf.WriteString(`"reservoir_size":`)
jsonx.AppendUint(buf, uint64(cap(events.events)))
buf.WriteByte(',')
buf.WriteString(`"events_seen":`)
jsonx.AppendUint(buf, uint64(events.numSeen))
buf.WriteByte('}')
buf.WriteByte(',')
buf.WriteByte('[')
for i, e := range events.events {
if i > 0 {
buf.WriteByte(',')
}
e.WriteJSON(buf)
}
buf.WriteByte(']')
buf.WriteByte(']')
return buf.Bytes(), nil
}
// split splits the events into two. NOTE! The two event pools are not valid
// priority queues, and should only be used to create JSON, not for adding any
// events.
func (events *analyticsEvents) split() (*analyticsEvents, *analyticsEvents) {
// numSeen is conserved: e1.numSeen + e2.numSeen == events.numSeen.
e1 := &analyticsEvents{
numSeen: len(events.events) / 2,
events: make([]analyticsEvent, len(events.events)/2),
failedHarvests: events.failedHarvests,
}
e2 := &analyticsEvents{
numSeen: events.numSeen - e1.numSeen,
events: make([]analyticsEvent, len(events.events)-len(e1.events)),
failedHarvests: events.failedHarvests,
}
// Note that slicing is not used to ensure that length == capacity for
// e1.events and e2.events.
copy(e1.events, events.events)
copy(e2.events, events.events[len(events.events)/2:])
return e1, e2
}
|