File: encoder.go

package info (click to toggle)
golang-github-dcso-fluxline 0.0~git20200907.78686e5-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, bullseye-backports
  • size: 88 kB
  • sloc: makefile: 16
file content (172 lines) | stat: -rw-r--r-- 4,686 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package fluxline

// DCSO fluxline
// Copyright (c) 2017, 2018, DCSO GmbH

import (
	"fmt"
	"io"
	"reflect"
	"sort"
	"strings"
	"time"

	"github.com/Showmax/go-fqdn"
)

// Encoder represents a component that encapsulates a target environment for
// measurement submissions, as given by hostname and receiving writer.
type Encoder struct {
	host   string
	Writer io.Writer
}

func escapeSpecialChars(in string) string {
	str := strings.Replace(in, ",", `\,`, -1)
	str = strings.Replace(str, "=", `\=`, -1)
	str = strings.Replace(str, " ", `\ `, -1)
	return str
}

func toInfluxRepr(tag string, val interface{}, nostatictypes bool) (string, error) {
	switch v := val.(type) {
	case string:
		if len(v) > 64000 {
			return "", fmt.Errorf("%s: string too long (%d characters, max. 64K)", tag, len(v))
		}
		return fmt.Sprintf("%q", v), nil
	case int32, int64, int16, int8, int, uint32, uint64, uint16, uint8, uint:
		if nostatictypes {
			return fmt.Sprintf("%d", v), nil
		}
		return fmt.Sprintf("%di", v), nil
	case float64, float32:
		return fmt.Sprintf("%g", v), nil
	case bool:
		return fmt.Sprintf("%t", v), nil
	case time.Time:
		return fmt.Sprintf("%d", uint64(v.UnixNano())), nil
	default:
		return "", fmt.Errorf("%s: unsupported type for Influx Line Protocol", tag)
	}
}

func recordFields(val interface{},
	fieldSet map[string]string, nostatictypes bool) (map[string]string, error) {
	t := reflect.TypeOf(val)
	v := reflect.ValueOf(val)

	for i := 0; i < t.NumField(); i++ {
		field := t.Field(i)
		tag := field.Tag.Get("influx")
		if tag == "" {
			continue
		}
		repr, err := toInfluxRepr(tag, v.Field(i).Interface(), nostatictypes)
		if err != nil {
			return nil, err
		}
		fieldSet[tag] = repr
	}
	return fieldSet, nil
}

func (a *Encoder) formatLineProtocol(prefix string,
	tags map[string]string, fieldSet map[string]string) string {
	out := ""
	tagstr := ""

	// sort by key to obtain stable output order
	keys := make([]string, 0, len(tags))
	for key := range tags {
		keys = append(keys, key)
	}
	sort.Strings(keys)

	// serialize tags
	for _, k := range keys {
		tagstr += ","
		tagstr += fmt.Sprintf("%s=%s", escapeSpecialChars(k), escapeSpecialChars(tags[k]))
	}

	// sort by key to obtain stable output order
	keys = make([]string, 0, len(fieldSet))
	for key := range fieldSet {
		keys = append(keys, key)
	}
	sort.Strings(keys)

	// serialize fields
	first := true
	for _, k := range keys {
		if !first {
			out += ","
		} else {
			first = false
		}
		out += fmt.Sprintf("%s=%s", escapeSpecialChars(k), fieldSet[k])
	}
	if out == "" {
		return ""
	}

	// construct line protocol string
	return fmt.Sprintf("%s,host=%s%s %s %d\n", prefix, a.host,
		tagstr, out, uint64(time.Now().UnixNano()))
}

// Encode writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
func (a *Encoder) encodeGeneric(prefix string, val interface{},
	tags map[string]string, nostatictypes bool) error {
	fieldSet := make(map[string]string)
	fieldSet, err := recordFields(val, fieldSet, nostatictypes)
	if err != nil {
		return err
	}
	_, err = a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, fieldSet)))
	return err
}

// Encode writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
func (a *Encoder) Encode(prefix string, val interface{},
	tags map[string]string) error {
	return a.encodeGeneric(prefix, val, tags, false)
}

// EncodeWithoutTypes writes the line protocol representation for a given measurement
// name, data struct and tag map to the io.Writer specified on encoder creation.
// In contrast to Encode(), this method never appends type suffixes to values.
func (a *Encoder) EncodeWithoutTypes(prefix string, val interface{},
	tags map[string]string) error {
	return a.encodeGeneric(prefix, val, tags, true)
}

// EncodeMap writes the line protocol representation for a given measurement
// name, field value map and tag map to the io.Writer specified on encoder
// creation.
func (a *Encoder) EncodeMap(prefix string, val map[string]string,
	tags map[string]string) error {
	_, err := a.Writer.Write([]byte(a.formatLineProtocol(prefix, tags, val)))
	return err
}

// NewEncoder creates a new encoder that writes to the given io.Writer.
func NewEncoder(w io.Writer) *Encoder {
	a := &Encoder{
		host:   fqdn.Get(),
		Writer: w,
	}
	return a
}

// NewEncoderWithHostname creates a new encoder that writes to the given
// io.Writer with an overridden hostname
func NewEncoderWithHostname(w io.Writer, host string) *Encoder {
	a := &Encoder{
		host:   host,
		Writer: w,
	}
	return a
}