File: statsd.go

package info (click to toggle)
golang-github-pascaldekloe-goe 0.1.0-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 168 kB
  • sloc: makefile: 3
file content (125 lines) | stat: -rw-r--r-- 2,694 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package metrics

import (
	"io"
	"strconv"
	"time"
)

// StatsDPackMax is the maximum packet size for batches.
// Be carefull not to exceed the network's MTU. See
// https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets
var StatsDPackMax = 1432

type statsD struct {
	prefix string
	// pool holds buffers to prevent mallocs at runtime.
	pool chan []byte
	// queue holds the pending messages.
	queue chan []byte
}

// NewStatsD returns a new Register with a StatsD implementation.
// Generally speaking, you want to connect over UDP.
//
//	conn, err := net.DialTimeout("udp", "localhost:8125", 4*time.Second)
//	if err != nil {
//		...
//	}
//	stats := metrics.NewStatsD(conn, time.Second)
//
// Batches are limited by the flushInterval span. A flushInterval of 0 implies no buffering.
func NewStatsD(conn io.Writer, flushInterval time.Duration) Register {
	d := new(statsD)
	{
		size := 1000
		d.queue = make(chan []byte, size)
		d.pool = make(chan []byte, size)
		for i := 0; i < size; i++ {
			d.pool <- make([]byte, 0, 40)
		}
	}

	// write statsD.queue to conn
	go func() {
		buf := make([]byte, 0, StatsDPackMax)
		var batchStart time.Time
		for {
			// flush on interval timeout
			if len(buf) > 0 && time.Since(batchStart) >= flushInterval {
				conn.Write(buf)
				buf = buf[:0]
				continue
			}

			var next []byte
			select {
			case next = <-d.queue:
			default:
				time.Sleep(time.Millisecond)
				continue
			}

			// flush first when size limit reached
			if len(buf)+1+len(next) > StatsDPackMax {
				conn.Write(buf)
				buf = buf[:0]
			}

			if len(buf) != 0 {
				buf = append(buf, '\n')
			} else {
				batchStart = time.Now()
			}
			buf = append(buf, next...)

			// reuse buffer
			d.pool <- next[:0]
		}
	}()

	return d
}

func (d *statsD) Seen(key string, n int) {
	buf := <-d.pool
	buf = append(buf, d.prefix...)
	buf = append(buf, key...)
	buf = append(buf, ':')

	switch {
	case n < 0 || n >= 100:
		buf = append(buf, strconv.Itoa(n)...)
	case n < 10:
		buf = append(buf, byte('0' + n))
	default:
		buf = append(buf, byte('0' + (n / 10)), byte('0' + (n % 10)))
	}

	buf = append(buf, '|', 'c')
	d.queue <- buf
}

func (d *statsD) Took(key string, since time.Time) {
	buf := <-d.pool
	buf = append(buf, d.prefix...)
	buf = append(buf, key...)
	buf = append(buf, ':')

	n := int64(time.Since(since)/time.Millisecond)
	switch {
	case n < 0 || n >= 100:
		buf = append(buf, strconv.FormatInt(n, 10)...)
	case n < 10:
		buf = append(buf, byte('0' + n))
	default:
		buf = append(buf, byte('0' + (n / 10)), byte('0' + (n % 10)))
	}

	buf = append(buf, '|', 'm', 's')
	d.queue <- buf
}

func (d *statsD) KeyPrefix(s string) {
	d.prefix = s
}