1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
package fluxline
// DCSO fluxline
// Copyright (c) 2017, 2018, DCSO GmbH
import (
"fmt"
"io"
"reflect"
"sort"
"strings"
"time"
"github.com/Showmax/go-fqdn"
)
// Encoder represents a component that encapsulates a target environment for
// measurement submissions, as given by hostname and receiving writer.
type Encoder struct {
host string
Writer io.Writer
}
func escapeSpecialChars(in string) string {
str := strings.Replace(in, ",", `\,`, -1)
str = strings.Replace(str, "=", `\=`, -1)
str = strings.Replace(str, " ", `\ `, -1)
return str
}
func toInfluxRepr(tag string, val interface{}, nostatictypes bool) (string, error) {
switch v := val.(type) {
case string:
if len(v) > 64000 {
return "", fmt.Errorf("%s: string too long (%d characters, max. 64K)", tag, len(v))
}
return fmt.Sprintf("%q", v), nil
case int32, int64, int16, int8, int, uint32, uint64, uint16, uint8, uint:
if nostatictypes {
return fmt.Sprintf("%d", v), nil
}
return fmt.Sprintf("%di", v), nil
case float64, float32:
return fmt.Sprintf("%g", v), nil
case bool:
return fmt.Sprintf("%t", v), nil
case time.Time:
return fmt.Sprintf("%d", uint64(v.UnixNano())), nil
default:
return "", fmt.Errorf("%s: unsupported type for Influx Line Protocol", tag)
}
}
func recordFields(val interface{},
fieldSet map[string]string, nostatictypes bool) (map[string]string, error) {
t := reflect.TypeOf(val)
v := reflect.ValueOf(val)
for i := 0; i < t.NumField(); i++ {
field := t.Field(i)
tag := field.Tag.Get("influx")
if tag == "" {
continue
}
repr, err := toInfluxRepr(tag, v.Field(i).Interface(), nostatictypes)
if err != nil {
return nil, err
}
fieldSet[tag] = repr
}
return fieldSet, nil
}
func (a *Encoder) formatLineProtocol(prefix string,
tags map[string]string, fieldSet map[string]string) string {
out := ""
tagstr := ""
// sort by key to obtain stable output order
keys := make([]string, 0, len(tags))
for key := range tags {
keys = append(keys, key)
}
sort.Strings(keys)
// serialize tags
for _, k := range keys {
tagstr += ","
tagstr += fmt.Sprintf("%s=%s", escapeSpecialChars(k), escapeSpecialChars(tags[k]))
}
// sort by key to obtain stable output order
keys = make([]string, 0, len(fieldSet))
for key := range fieldSet {
keys = append(keys, key)
}
sort.Strings(keys)
// serialize fields
first := true
for _, k := range keys {
if !first {
out += ","
} else {
first = false
}
out += fmt.Sprintf("%s=%s", escapeSpecialChars(k), fieldSet[k])
}
if out == "" {
return ""
}
// construct line protocol string
return fmt.Sprintf("%s,host=%s%s %s %d\n", prefix, a.host,
tagstr, out, uint64(time.Now().UnixNano()))
}
// Encode writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
func (a *Encoder) encodeGeneric(prefix string, val interface{},
tags map[string]string, nostatictypes bool) error {
fieldSet := make(map[string]string)
fieldSet, err := recordFields(val, fieldSet, nostatictypes)
if err != nil {
return err
}
_, err = a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, fieldSet)))
return err
}
// Encode writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
func (a *Encoder) Encode(prefix string, val interface{},
tags map[string]string) error {
return a.encodeGeneric(prefix, val, tags, false)
}
// EncodeWithoutTypes writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
// In contrast to Encode(), this method never appends type suffixes to values.
func (a *Encoder) EncodeWithoutTypes(prefix string, val interface{},
tags map[string]string) error {
return a.encodeGeneric(prefix, val, tags, true)
}
// EncodeMap writes the line protocol representation for a given measurement
// name, field value map and tag map to the io.Writer specified on encoder
// creation.
func (a *Encoder) EncodeMap(prefix string, val map[string]string,
tags map[string]string) error {
_, err := a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, val)))
return err
}
// NewEncoder creates a new encoder that writes to the given io.Writer.
func NewEncoder(w io.Writer) *Encoder {
a := &Encoder{
host: fqdn.Get(),
Writer: w,
}
return a
}
// NewEncoderWithHostname creates a new encoder that writes to the given
// io.Writer with an overridden hostname
func NewEncoderWithHostname(w io.Writer, host string) *Encoder {
a := &Encoder{
host: host,
Writer: w,
}
return a
}
|