1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res FilteredExemplarReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
values: make(map[attribute.Distinct]datapoint[N]),
start: now(),
}
}
// lastValue summarizes a set of measurements as the last one made.
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func() FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
}
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
s.Lock()
defer s.Unlock()
attr := s.limit.Attributes(fltrAttr, s.values)
d, ok := s.values[attr.Equivalent()]
if !ok {
d.res = s.newRes()
}
d.attrs = attr
d.value = value
d.res.Offer(ctx, value, droppedAttr)
s.values[attr.Equivalent()] = d
}
func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// TODO (#3006): This will use an unbounded amount of memory if there
// are unbounded number of attribute sets being aggregated. Attribute
// sets that become "stale" need to be forgotten so this will not
// overload the system.
*dest = gData
return n
}
// copyDpts copies the datapoints held by s into dest. The number of datapoints
// copied is returned.
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
n := len(s.values)
*dest = reset(*dest, n, n)
var i int
for _, v := range s.values {
(*dest)[i].Attributes = v.attrs
(*dest)[i].StartTime = s.start
(*dest)[i].Time = t
(*dest)[i].Value = v.value
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
i++
}
return n
}
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}
// precomputedLastValue summarizes a set of observations as the last one made.
type precomputedLastValue[N int64 | float64] struct {
*lastValue[N]
}
func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
// Update start time for delta temporality.
s.start = t
*dest = gData
return n
}
func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
t := now()
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
// the DataPoints is missed (better luck next time).
gData, _ := (*dest).(metricdata.Gauge[N])
s.Lock()
defer s.Unlock()
n := s.copyDpts(&gData.DataPoints, t)
// Do not report stale values.
clear(s.values)
*dest = gData
return n
}
|