1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
|
package util
// DCSO FEVER
// Copyright (c) 2017, DCSO GmbH
import (
"bytes"
"strings"
"sync"
"time"
"github.com/DCSO/fluxline"
log "github.com/sirupsen/logrus"
)
// PerformanceStatsEncoder is a component to collect, encode and submit data
// to an InfluxDb via RabbitMQ.
type PerformanceStatsEncoder struct {
sync.RWMutex
Encoder *fluxline.Encoder
Buffer bytes.Buffer
Logger *log.Entry
Tags map[string]string
Submitter StatsSubmitter
SubmitPeriod time.Duration
LastSubmitted time.Time
DummyMode bool
}
// MakePerformanceStatsEncoder creates a new stats encoder, submitting via
// the given StatsSubmitter, with at least submitPeriod time between submissions.
// if dummyMode is set, then the result will be printed to stdout instead of
// submitting.
func MakePerformanceStatsEncoder(statsSubmitter StatsSubmitter,
submitPeriod time.Duration, dummyMode bool) *PerformanceStatsEncoder {
a := &PerformanceStatsEncoder{
Logger: log.WithFields(log.Fields{
"domain": "statscollect",
}),
Submitter: statsSubmitter,
DummyMode: dummyMode,
Tags: make(map[string]string),
LastSubmitted: time.Now(),
SubmitPeriod: submitPeriod,
}
a.Encoder = fluxline.NewEncoder(&a.Buffer)
return a
}
// SubmitWithTags encodes the data annotated with 'influx' tags in the passed
// struct and sends it to the configured submitter. This version also allows to
// add a set of user-defined tags as a key-value map.
func (a *PerformanceStatsEncoder) SubmitWithTags(val interface{}, tags map[string]string) {
a.Lock()
a.Buffer.Reset()
err := a.Encoder.EncodeWithoutTypes(ToolName, val, tags)
if err != nil {
if a.Logger != nil {
a.Logger.WithFields(log.Fields{}).Warn(err)
}
}
line := strings.TrimSpace(a.Buffer.String())
if line == "" {
a.Logger.WithFields(log.Fields{}).Warn("skipping empty influx line")
a.Unlock()
return
}
jsonString := []byte(line)
a.Submitter.SubmitWithHeaders(jsonString, "", "text/plain", map[string]string{
"database": "telegraf",
"retention_policy": "default",
})
a.Unlock()
}
// Submit encodes the data annotated with 'influx' tags in the passed struct and
// sends it to the configured submitter.
func (a *PerformanceStatsEncoder) Submit(val interface{}) {
a.SubmitWithTags(val, a.Tags)
}
|