1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
|
// Copyright 2020 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"fmt"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
)
const maxSamplesInMemory = 5000
type queryRangeAPI interface {
QueryRange(ctx context.Context, query string, r v1.Range, opts ...v1.Option) (model.Value, v1.Warnings, error)
}
type ruleImporter struct {
logger log.Logger
config ruleImporterConfig
apiClient queryRangeAPI
groups map[string]*rules.Group
ruleManager *rules.Manager
}
type ruleImporterConfig struct {
outputDir string
start time.Time
end time.Time
evalInterval time.Duration
maxBlockDuration time.Duration
}
// newRuleImporter creates a new rule importer that can be used to parse and evaluate recording rule files and create new series
// written to disk in blocks.
func newRuleImporter(logger log.Logger, config ruleImporterConfig, apiClient queryRangeAPI) *ruleImporter {
level.Info(logger).Log("backfiller", "new rule importer", "start", config.start.Format(time.RFC822), "end", config.end.Format(time.RFC822))
return &ruleImporter{
logger: logger,
config: config,
apiClient: apiClient,
ruleManager: rules.NewManager(&rules.ManagerOptions{}),
}
}
// loadGroups parses groups from a list of recording rule files.
func (importer *ruleImporter) loadGroups(_ context.Context, filenames []string) (errs []error) {
groups, errs := importer.ruleManager.LoadGroups(importer.config.evalInterval, labels.Labels{}, "", nil, filenames...)
if errs != nil {
return errs
}
importer.groups = groups
return nil
}
// importAll evaluates all the recording rules and creates new time series and writes them to disk in blocks.
func (importer *ruleImporter) importAll(ctx context.Context) (errs []error) {
for name, group := range importer.groups {
level.Info(importer.logger).Log("backfiller", "processing group", "name", name)
for i, r := range group.Rules() {
level.Info(importer.logger).Log("backfiller", "processing rule", "id", i, "name", r.Name())
if err := importer.importRule(ctx, r.Query().String(), r.Name(), r.Labels(), importer.config.start, importer.config.end, int64(importer.config.maxBlockDuration/time.Millisecond), group); err != nil {
errs = append(errs, err)
}
}
}
return errs
}
// importRule queries a prometheus API to evaluate rules at times in the past.
func (importer *ruleImporter) importRule(ctx context.Context, ruleExpr, ruleName string, ruleLabels labels.Labels, start, end time.Time,
maxBlockDuration int64, grp *rules.Group,
) (err error) {
blockDuration := getCompatibleBlockDuration(maxBlockDuration)
startInMs := start.Unix() * int64(time.Second/time.Millisecond)
endInMs := end.Unix() * int64(time.Second/time.Millisecond)
for startOfBlock := blockDuration * (startInMs / blockDuration); startOfBlock <= endInMs; startOfBlock += blockDuration {
endOfBlock := startOfBlock + blockDuration - 1
currStart := max(startOfBlock/int64(time.Second/time.Millisecond), start.Unix())
startWithAlignment := grp.EvalTimestamp(time.Unix(currStart, 0).UTC().UnixNano())
for startWithAlignment.Unix() < currStart {
startWithAlignment = startWithAlignment.Add(grp.Interval())
}
end := time.Unix(min(endOfBlock/int64(time.Second/time.Millisecond), end.Unix()), 0).UTC()
if end.Before(startWithAlignment) {
break
}
val, warnings, err := importer.apiClient.QueryRange(ctx,
ruleExpr,
v1.Range{
Start: startWithAlignment,
End: end,
Step: grp.Interval(),
},
)
if err != nil {
return fmt.Errorf("query range: %w", err)
}
if warnings != nil {
level.Warn(importer.logger).Log("msg", "Range query returned warnings.", "warnings", warnings)
}
// To prevent races with compaction, a block writer only allows appending samples
// that are at most half a block size older than the most recent sample appended so far.
// However, in the way we use the block writer here, compaction doesn't happen, while we
// also need to append samples throughout the whole block range. To allow that, we
// pretend that the block is twice as large here, but only really add sample in the
// original interval later.
w, err := tsdb.NewBlockWriter(log.NewNopLogger(), importer.config.outputDir, 2*blockDuration)
if err != nil {
return fmt.Errorf("new block writer: %w", err)
}
var closed bool
defer func() {
if !closed {
err = tsdb_errors.NewMulti(err, w.Close()).Err()
}
}()
app := newMultipleAppender(ctx, w)
var matrix model.Matrix
switch val.Type() {
case model.ValMatrix:
matrix = val.(model.Matrix)
for _, sample := range matrix {
lb := labels.NewBuilder(labels.Labels{})
for name, value := range sample.Metric {
lb.Set(string(name), string(value))
}
// Setting the rule labels after the output of the query,
// so they can override query output.
ruleLabels.Range(func(l labels.Label) {
lb.Set(l.Name, l.Value)
})
lb.Set(labels.MetricName, ruleName)
lbls := lb.Labels()
for _, value := range sample.Values {
if err := app.add(ctx, lbls, timestamp.FromTime(value.Timestamp.Time()), float64(value.Value)); err != nil {
return fmt.Errorf("add: %w", err)
}
}
}
default:
return fmt.Errorf("rule result is wrong type %s", val.Type().String())
}
if err := app.flushAndCommit(ctx); err != nil {
return fmt.Errorf("flush and commit: %w", err)
}
err = tsdb_errors.NewMulti(err, w.Close()).Err()
closed = true
}
return err
}
func newMultipleAppender(ctx context.Context, blockWriter *tsdb.BlockWriter) *multipleAppender {
return &multipleAppender{
maxSamplesInMemory: maxSamplesInMemory,
writer: blockWriter,
appender: blockWriter.Appender(ctx),
}
}
// multipleAppender keeps track of how many series have been added to the current appender.
// If the max samples have been added, then all series are committed and a new appender is created.
type multipleAppender struct {
maxSamplesInMemory int
currentSampleCount int
writer *tsdb.BlockWriter
appender storage.Appender
}
func (m *multipleAppender) add(ctx context.Context, l labels.Labels, t int64, v float64) error {
if _, err := m.appender.Append(0, l, t, v); err != nil {
return fmt.Errorf("multiappender append: %w", err)
}
m.currentSampleCount++
if m.currentSampleCount >= m.maxSamplesInMemory {
return m.commit(ctx)
}
return nil
}
func (m *multipleAppender) commit(ctx context.Context) error {
if m.currentSampleCount == 0 {
return nil
}
if err := m.appender.Commit(); err != nil {
return fmt.Errorf("multiappender commit: %w", err)
}
m.appender = m.writer.Appender(ctx)
m.currentSampleCount = 0
return nil
}
func (m *multipleAppender) flushAndCommit(ctx context.Context) error {
if err := m.commit(ctx); err != nil {
return err
}
if _, err := m.writer.Flush(ctx); err != nil {
return fmt.Errorf("multiappender flush: %w", err)
}
return nil
}
|