1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
|
package metrics
import (
"io"
"strconv"
"time"
)
// StatsDPackMax is the maximum packet size for batches.
// Be carefull not to exceed the network's MTU. See
// https://github.com/etsy/statsd/blob/master/docs/metric_types.md#multi-metric-packets
var StatsDPackMax = 1432
type statsD struct {
prefix string
// pool holds buffers to prevent mallocs at runtime.
pool chan []byte
// queue holds the pending messages.
queue chan []byte
}
// NewStatsD returns a new Register with a StatsD implementation.
// Generally speaking, you want to connect over UDP.
//
// conn, err := net.DialTimeout("udp", "localhost:8125", 4*time.Second)
// if err != nil {
// ...
// }
// stats := metrics.NewStatsD(conn, time.Second)
//
// Batches are limited by the flushInterval span. A flushInterval of 0 implies no buffering.
func NewStatsD(conn io.Writer, flushInterval time.Duration) Register {
d := new(statsD)
{
size := 1000
d.queue = make(chan []byte, size)
d.pool = make(chan []byte, size)
for i := 0; i < size; i++ {
d.pool <- make([]byte, 0, 40)
}
}
// write statsD.queue to conn
go func() {
buf := make([]byte, 0, StatsDPackMax)
var batchStart time.Time
for {
// flush on interval timeout
if len(buf) > 0 && time.Since(batchStart) >= flushInterval {
conn.Write(buf)
buf = buf[:0]
continue
}
var next []byte
select {
case next = <-d.queue:
default:
time.Sleep(time.Millisecond)
continue
}
// flush first when size limit reached
if len(buf)+1+len(next) > StatsDPackMax {
conn.Write(buf)
buf = buf[:0]
}
if len(buf) != 0 {
buf = append(buf, '\n')
} else {
batchStart = time.Now()
}
buf = append(buf, next...)
// reuse buffer
d.pool <- next[:0]
}
}()
return d
}
func (d *statsD) Seen(key string, n int) {
buf := <-d.pool
buf = append(buf, d.prefix...)
buf = append(buf, key...)
buf = append(buf, ':')
switch {
case n < 0 || n >= 100:
buf = append(buf, strconv.Itoa(n)...)
case n < 10:
buf = append(buf, byte('0' + n))
default:
buf = append(buf, byte('0' + (n / 10)), byte('0' + (n % 10)))
}
buf = append(buf, '|', 'c')
d.queue <- buf
}
func (d *statsD) Took(key string, since time.Time) {
buf := <-d.pool
buf = append(buf, d.prefix...)
buf = append(buf, key...)
buf = append(buf, ':')
n := int64(time.Since(since)/time.Millisecond)
switch {
case n < 0 || n >= 100:
buf = append(buf, strconv.FormatInt(n, 10)...)
case n < 10:
buf = append(buf, byte('0' + n))
default:
buf = append(buf, byte('0' + (n / 10)), byte('0' + (n % 10)))
}
buf = append(buf, '|', 'm', 's')
d.queue <- buf
}
func (d *statsD) KeyPrefix(s string) {
d.prefix = s
}
|