File: weighted_histogram.go

package info (click to toggle)
golang-k8s-component-base 0.32.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,432 kB
  • sloc: makefile: 4
file content (203 lines) | stat: -rw-r--r-- 6,347 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
/*
Copyright 2022 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package prometheusextension

import (
	"fmt"
	"math"
	"sort"
	"sync"

	"github.com/prometheus/client_golang/prometheus"
	dto "github.com/prometheus/client_model/go"
)

// WeightedHistogram generalizes Histogram: each observation has
// an associated _weight_. For a given `x` and `N`,
// `1` call on `ObserveWithWeight(x, N)` has the same meaning as
// `N` calls on `ObserveWithWeight(x, 1)`.
// The weighted sum might differ slightly due to the use of
// floating point, although the implementation takes some steps
// to mitigate that.
// If every weight were 1,
// this would be the same as the existing Histogram abstraction.
type WeightedHistogram interface {
	prometheus.Metric
	prometheus.Collector
	WeightedObserver
}

// WeightedObserver generalizes the Observer interface.
type WeightedObserver interface {
	// Set the variable to the given value with the given weight.
	ObserveWithWeight(value float64, weight uint64)
}

// WeightedHistogramOpts is the same as for an ordinary Histogram
type WeightedHistogramOpts = prometheus.HistogramOpts

// NewWeightedHistogram creates a new WeightedHistogram
func NewWeightedHistogram(opts WeightedHistogramOpts) (WeightedHistogram, error) {
	desc := prometheus.NewDesc(
		prometheus.BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
		wrapWeightedHelp(opts.Help),
		nil,
		opts.ConstLabels,
	)
	return newWeightedHistogram(desc, opts)
}

func wrapWeightedHelp(given string) string {
	return "EXPERIMENTAL: " + given
}

func newWeightedHistogram(desc *prometheus.Desc, opts WeightedHistogramOpts, variableLabelValues ...string) (*weightedHistogram, error) {
	if len(opts.Buckets) == 0 {
		opts.Buckets = prometheus.DefBuckets
	}

	for i, upperBound := range opts.Buckets {
		if i < len(opts.Buckets)-1 {
			if upperBound >= opts.Buckets[i+1] {
				return nil, fmt.Errorf(
					"histogram buckets must be in increasing order: %f >= %f",
					upperBound, opts.Buckets[i+1],
				)
			}
		} else {
			if math.IsInf(upperBound, +1) {
				// The +Inf bucket is implicit. Remove it here.
				opts.Buckets = opts.Buckets[:i]
			}
		}
	}
	upperBounds := make([]float64, len(opts.Buckets))
	copy(upperBounds, opts.Buckets)

	return &weightedHistogram{
		desc:                desc,
		variableLabelValues: variableLabelValues,
		upperBounds:         upperBounds,
		buckets:             make([]uint64, len(upperBounds)+1),
		hotCount:            initialHotCount,
	}, nil
}

type weightedHistogram struct {
	desc                *prometheus.Desc
	variableLabelValues []string
	upperBounds         []float64 // exclusive of +Inf

	lock sync.Mutex // applies to all the following

	// buckets is longer by one than upperBounds.
	// For 0 <= idx < len(upperBounds), buckets[idx] holds the
	// accumulated time.Duration that value has been <=
	// upperBounds[idx] but not <= upperBounds[idx-1].
	// buckets[len(upperBounds)] holds the accumulated
	// time.Duration when value fit in no other bucket.
	buckets []uint64

	// sumHot + sumCold is the weighted sum of value.
	// Rather than risk loss of precision in one
	// float64, we do this sum hierarchically.  Many successive
	// increments are added into sumHot; once in a while
	// the magnitude of sumHot is compared to the magnitude
	// of sumCold and, if the ratio is high enough,
	// sumHot is transferred into sumCold.
	sumHot  float64
	sumCold float64

	transferThreshold float64 // = math.Abs(sumCold) / 2^26 (that's about half of the bits of precision in a float64)

	// hotCount is used to decide when to consider dumping sumHot into sumCold.
	// hotCount counts upward from initialHotCount to zero.
	hotCount int
}

// initialHotCount is the negative of the number of terms
// that are summed into sumHot before considering whether
// to transfer to sumCold.  This only has to be big enough
// to make the extra floating point operations occur in a
// distinct minority of cases.
const initialHotCount = -15

var _ WeightedHistogram = &weightedHistogram{}
var _ prometheus.Metric = &weightedHistogram{}
var _ prometheus.Collector = &weightedHistogram{}

func (sh *weightedHistogram) ObserveWithWeight(value float64, weight uint64) {
	idx := sort.SearchFloat64s(sh.upperBounds, value)
	sh.lock.Lock()
	defer sh.lock.Unlock()
	sh.updateLocked(idx, value, weight)
}

func (sh *weightedHistogram) observeWithWeightLocked(value float64, weight uint64) {
	idx := sort.SearchFloat64s(sh.upperBounds, value)
	sh.updateLocked(idx, value, weight)
}

func (sh *weightedHistogram) updateLocked(idx int, value float64, weight uint64) {
	sh.buckets[idx] += weight
	newSumHot := sh.sumHot + float64(weight)*value
	sh.hotCount++
	if sh.hotCount >= 0 {
		sh.hotCount = initialHotCount
		if math.Abs(newSumHot) > sh.transferThreshold {
			newSumCold := sh.sumCold + newSumHot
			sh.sumCold = newSumCold
			sh.transferThreshold = math.Abs(newSumCold / 67108864)
			sh.sumHot = 0
			return
		}
	}
	sh.sumHot = newSumHot
}

func (sh *weightedHistogram) Desc() *prometheus.Desc {
	return sh.desc
}

func (sh *weightedHistogram) Write(dest *dto.Metric) error {
	count, sum, buckets := func() (uint64, float64, map[float64]uint64) {
		sh.lock.Lock()
		defer sh.lock.Unlock()
		nBounds := len(sh.upperBounds)
		buckets := make(map[float64]uint64, nBounds)
		var count uint64
		for idx, upperBound := range sh.upperBounds {
			count += sh.buckets[idx]
			buckets[upperBound] = count
		}
		count += sh.buckets[nBounds]
		return count, sh.sumHot + sh.sumCold, buckets
	}()
	metric, err := prometheus.NewConstHistogram(sh.desc, count, sum, buckets, sh.variableLabelValues...)
	if err != nil {
		return err
	}
	return metric.Write(dest)
}

func (sh *weightedHistogram) Describe(ch chan<- *prometheus.Desc) {
	ch <- sh.desc
}

func (sh *weightedHistogram) Collect(ch chan<- prometheus.Metric) {
	ch <- sh
}