1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
|
package metrics
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"time"
"compress/gzip"
)
// InitPushProcessMetrics sets up periodic push for 'process_*' metrics to the given pushURL with the given interval.
//
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to pushURL.
//
// The metrics are pushed to pushURL in Prometheus text exposition format.
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
//
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
//
// It is OK calling InitPushProcessMetrics multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls.
func InitPushProcessMetrics(pushURL string, interval time.Duration, extraLabels string) error {
writeMetrics := func(w io.Writer) {
WriteProcessMetrics(w)
}
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
}
// InitPush sets up periodic push for globally registered metrics to the given pushURL with the given interval.
//
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to pushURL.
//
// If pushProcessMetrics is set to true, then 'process_*' metrics are also pushed to pushURL.
//
// The metrics are pushed to pushURL in Prometheus text exposition format.
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
//
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
//
// It is OK calling InitPush multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls.
func InitPush(pushURL string, interval time.Duration, extraLabels string, pushProcessMetrics bool) error {
writeMetrics := func(w io.Writer) {
WritePrometheus(w, pushProcessMetrics)
}
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
}
// InitPush sets up periodic push for metrics from s to the given pushURL with the given interval.
//
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to pushURL.
//
// / The metrics are pushed to pushURL in Prometheus text exposition format.
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
//
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
//
// It is OK calling InitPush multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls.
func (s *Set) InitPush(pushURL string, interval time.Duration, extraLabels string) error {
writeMetrics := func(w io.Writer) {
s.WritePrometheus(w)
}
return InitPushExt(pushURL, interval, extraLabels, writeMetrics)
}
// InitPushExt sets up periodic push for metrics obtained by calling writeMetrics with the given interval.
//
// extraLabels may contain comma-separated list of `label="value"` labels, which will be added
// to all the metrics before pushing them to pushURL.
//
// The writeMetrics callback must write metrics to w in Prometheus text exposition format without timestamps and trailing comments.
// See https://github.com/prometheus/docs/blob/main/content/docs/instrumenting/exposition_formats.md#text-based-format
//
// It is recommended pushing metrics to /api/v1/import/prometheus endpoint according to
// https://docs.victoriametrics.com/#how-to-import-data-in-prometheus-exposition-format
//
// It is OK calling InitPushExt multiple times with different pushURL -
// in this case metrics are pushed to all the provided pushURL urls.
//
// It is OK calling InitPushExt multiple times with different writeMetrics -
// in this case all the metrics generated by writeMetrics callbacks are writte to pushURL.
func InitPushExt(pushURL string, interval time.Duration, extraLabels string, writeMetrics func(w io.Writer)) error {
if interval <= 0 {
return fmt.Errorf("interval must be positive; got %s", interval)
}
if err := validateTags(extraLabels); err != nil {
return fmt.Errorf("invalid extraLabels=%q: %w", extraLabels, err)
}
pu, err := url.Parse(pushURL)
if err != nil {
return fmt.Errorf("cannot parse pushURL=%q: %w", pushURL, err)
}
if pu.Scheme != "http" && pu.Scheme != "https" {
return fmt.Errorf("unsupported scheme in pushURL=%q; expecting 'http' or 'https'", pushURL)
}
if pu.Host == "" {
return fmt.Errorf("missing host in pushURL=%q", pushURL)
}
pushURLRedacted := pu.Redacted()
c := &http.Client{
Timeout: interval,
}
pushesTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_total{url=%q}`, pushURLRedacted))
pushErrorsTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_errors_total{url=%q}`, pushURLRedacted))
bytesPushedTotal := pushMetrics.GetOrCreateCounter(fmt.Sprintf(`metrics_push_bytes_pushed_total{url=%q}`, pushURLRedacted))
pushDuration := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_duration_seconds{url=%q}`, pushURLRedacted))
pushBlockSize := pushMetrics.GetOrCreateHistogram(fmt.Sprintf(`metrics_push_block_size_bytes{url=%q}`, pushURLRedacted))
pushMetrics.GetOrCreateFloatCounter(fmt.Sprintf(`metrics_push_interval_seconds{url=%q}`, pushURLRedacted)).Set(interval.Seconds())
go func() {
ticker := time.NewTicker(interval)
var bb bytes.Buffer
var tmpBuf []byte
zw := gzip.NewWriter(&bb)
for range ticker.C {
bb.Reset()
writeMetrics(&bb)
if len(extraLabels) > 0 {
tmpBuf = addExtraLabels(tmpBuf[:0], bb.Bytes(), extraLabels)
bb.Reset()
if _, err := bb.Write(tmpBuf); err != nil {
panic(fmt.Errorf("BUG: cannot write %d bytes to bytes.Buffer: %s", len(tmpBuf), err))
}
}
tmpBuf = append(tmpBuf[:0], bb.Bytes()...)
bb.Reset()
zw.Reset(&bb)
if _, err := zw.Write(tmpBuf); err != nil {
panic(fmt.Errorf("BUG: cannot write %d bytes to gzip writer: %s", len(tmpBuf), err))
}
if err := zw.Close(); err != nil {
panic(fmt.Errorf("BUG: cannot flush metrics to gzip writer: %s", err))
}
pushesTotal.Inc()
blockLen := bb.Len()
bytesPushedTotal.Add(blockLen)
pushBlockSize.Update(float64(blockLen))
req, err := http.NewRequest("GET", pushURL, &bb)
if err != nil {
panic(fmt.Errorf("BUG: metrics.push: cannot initialize request for metrics push to %q: %w", pushURLRedacted, err))
}
req.Header.Set("Content-Type", "text/plain")
req.Header.Set("Content-Encoding", "gzip")
startTime := time.Now()
resp, err := c.Do(req)
pushDuration.UpdateDuration(startTime)
if err != nil {
log.Printf("ERROR: metrics.push: cannot push metrics to %q: %s", pushURLRedacted, err)
pushErrorsTotal.Inc()
continue
}
if resp.StatusCode/100 != 2 {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
log.Printf("ERROR: metrics.push: unexpected status code in response from %q: %d; expecting 2xx; response body: %q",
pushURLRedacted, resp.StatusCode, body)
pushErrorsTotal.Inc()
continue
}
_ = resp.Body.Close()
}
}()
return nil
}
var pushMetrics = NewSet()
func writePushMetrics(w io.Writer) {
pushMetrics.WritePrometheus(w)
}
func addExtraLabels(dst, src []byte, extraLabels string) []byte {
for len(src) > 0 {
var line []byte
n := bytes.IndexByte(src, '\n')
if n >= 0 {
line = src[:n]
src = src[n+1:]
} else {
line = src
src = nil
}
line = bytes.TrimSpace(line)
if len(line) == 0 {
// Skip empy lines
continue
}
if bytes.HasPrefix(line, bashBytes) {
// Copy comments as is
dst = append(dst, line...)
dst = append(dst, '\n')
continue
}
n = bytes.IndexByte(line, '{')
if n >= 0 {
dst = append(dst, line[:n+1]...)
dst = append(dst, extraLabels...)
dst = append(dst, ',')
dst = append(dst, line[n+1:]...)
} else {
n = bytes.LastIndexByte(line, ' ')
if n < 0 {
panic(fmt.Errorf("BUG: missing whitespace between metric name and metric value in Prometheus text exposition line %q", line))
}
dst = append(dst, line[:n]...)
dst = append(dst, '{')
dst = append(dst, extraLabels...)
dst = append(dst, '}')
dst = append(dst, line[n:]...)
}
dst = append(dst, '\n')
}
return dst
}
var bashBytes = []byte("#")
|