File: exec.go

package info (click to toggle)
golang-collectd 0.3.0%2Bgit20181025.f80706d-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 312 kB
  • sloc: makefile: 3
file content (170 lines) | stat: -rw-r--r-- 4,012 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// Package exec implements tools to write plugins for collectd's "exec plugin"
// in Go.
package exec // import "collectd.org/exec"

import (
	"context"
	"log"
	"os"
	"strconv"
	"sync"
	"time"

	"collectd.org/api"
	"collectd.org/format"
)

// Putval is the dispatcher used by the exec package to print ValueLists.
var Putval = format.NewPutval(os.Stdout)

type valueCallback struct {
	callback func() api.Value
	vl       *api.ValueList
	done     chan bool
}

type voidCallback struct {
	callback func(context.Context, time.Duration)
	interval time.Duration
	done     chan bool
}

type callback interface {
	run(context.Context, *sync.WaitGroup)
	stop()
}

// Executor holds one or more callbacks which are called periodically.
type Executor struct {
	cb    []callback
	group sync.WaitGroup
}

// NewExecutor returns a pointer to a new Executor object.
func NewExecutor() *Executor {
	return &Executor{
		group: sync.WaitGroup{},
	}
}

// ValueCallback adds a simple "value" callback to the Executor. The callback
// only returns a Number, i.e. either a api.Gauge or api.Derive, and formatting
// and printing is done by the executor.
func (e *Executor) ValueCallback(callback func() api.Value, vl *api.ValueList) {
	e.cb = append(e.cb, valueCallback{
		callback: callback,
		vl:       vl,
		done:     make(chan bool),
	})
}

// VoidCallback adds a "complex" callback to the Executor. While the functions
// prototype is simpler, all the work has to be done by the callback, i.e. the
// callback needs to format and print the appropriate lines to "STDOUT".
// However, this allows cases in which the number of values reported varies,
// e.g. depending on the system the code is running on.
func (e *Executor) VoidCallback(callback func(context.Context, time.Duration), interval time.Duration) {
	e.cb = append(e.cb, voidCallback{
		callback: callback,
		interval: interval,
		done:     make(chan bool),
	})
}

// Run starts calling all callbacks periodically and blocks.
func (e *Executor) Run(ctx context.Context) {
	for _, cb := range e.cb {
		e.group.Add(1)
		go cb.run(ctx, &e.group)
	}

	e.group.Wait()
}

// Stop sends a signal to all callbacks to exit and returns. This unblocks
// "Run()" but does not block itself.
func (e *Executor) Stop() {
	for _, cb := range e.cb {
		cb.stop()
	}
}

func (cb valueCallback) run(ctx context.Context, g *sync.WaitGroup) {
	if cb.vl.Host == "" {
		cb.vl.Host = Hostname()
	}
	cb.vl.Interval = sanitizeInterval(cb.vl.Interval)
	cb.vl.Values = make([]api.Value, 1)

	ticker := time.NewTicker(cb.vl.Interval)

	for {
		select {
		case _ = <-ticker.C:
			cb.vl.Values[0] = cb.callback()
			cb.vl.Time = time.Now()
			Putval.Write(ctx, cb.vl)
		case _ = <-cb.done:
			g.Done()
			return
		}
	}
}

func (cb valueCallback) stop() {
	cb.done <- true
}

func (cb voidCallback) run(ctx context.Context, g *sync.WaitGroup) {
	ticker := time.NewTicker(sanitizeInterval(cb.interval))

	for {
		select {
		case _ = <-ticker.C:
			cb.callback(ctx, cb.interval)
		case _ = <-cb.done:
			g.Done()
			return
		}
	}
}

func (cb voidCallback) stop() {
	cb.done <- true
}

// Interval determines the default interval from the "COLLECTD_INTERVAL"
// environment variable. It falls back to 10s if the environment variable is
// unset or cannot be parsed.
func Interval() time.Duration {
	i, err := strconv.ParseFloat(os.Getenv("COLLECTD_INTERVAL"), 64)
	if err != nil {
		log.Printf("unable to determine default interval: %v", err)
		return time.Second * 10
	}

	return time.Duration(i * float64(time.Second))
}

// Hostname determines the hostname to use from the "COLLECTD_HOSTNAME"
// environment variable and falls back to os.Hostname() if it is unset. If that
// also fails an empty string is returned.
func Hostname() string {
	if h := os.Getenv("COLLECTD_HOSTNAME"); h != "" {
		return h
	}

	if h, err := os.Hostname(); err == nil {
		return h
	}

	return ""
}

func sanitizeInterval(in time.Duration) time.Duration {
	if in == time.Duration(0) {
		return Interval()
	}

	return in
}