1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
// Copyright (c) 2012-2016 Eli Janssen
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package statsd
import (
"bytes"
"fmt"
"sync"
"time"
)
var senderPool = newBufferPool()
// BufferedSender provides a buffered statsd udp, sending multiple
// metrics, where possible.
type BufferedSender struct {
sender Sender
flushBytes int
flushInterval time.Duration
// buffers
bufmx sync.Mutex
buffer *bytes.Buffer
bufs chan *bytes.Buffer
// lifecycle
runmx sync.RWMutex
shutdown chan chan error
running bool
}
// Send bytes.
func (s *BufferedSender) Send(data []byte) (int, error) {
s.runmx.RLock()
if !s.running {
s.runmx.RUnlock()
return 0, fmt.Errorf("BufferedSender is not running")
}
s.withBufferLock(func() {
blen := s.buffer.Len()
if blen > 0 && blen+len(data)+1 >= s.flushBytes {
s.swapnqueue()
}
s.buffer.Write(data)
s.buffer.WriteByte('\n')
if s.buffer.Len() >= s.flushBytes {
s.swapnqueue()
}
})
s.runmx.RUnlock()
return len(data), nil
}
// Close closes the Buffered Sender and cleans up.
func (s *BufferedSender) Close() error {
// since we are running, write lock during cleanup
s.runmx.Lock()
defer s.runmx.Unlock()
if !s.running {
return nil
}
errChan := make(chan error)
s.running = false
s.shutdown <- errChan
return <-errChan
}
// Start Buffered Sender
// Begins ticker and read loop
func (s *BufferedSender) Start() {
// write lock to start running
s.runmx.Lock()
defer s.runmx.Unlock()
if s.running {
return
}
s.running = true
s.bufs = make(chan *bytes.Buffer, 32)
go s.run()
}
func (s *BufferedSender) withBufferLock(fn func()) {
s.bufmx.Lock()
fn()
s.bufmx.Unlock()
}
func (s *BufferedSender) swapnqueue() {
if s.buffer.Len() == 0 {
return
}
ob := s.buffer
nb := senderPool.Get()
s.buffer = nb
s.bufs <- ob
}
func (s *BufferedSender) run() {
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()
doneChan := make(chan bool)
go func() {
for buf := range s.bufs {
s.flush(buf)
senderPool.Put(buf)
}
doneChan <- true
}()
for {
select {
case <-ticker.C:
s.withBufferLock(func() {
s.swapnqueue()
})
case errChan := <-s.shutdown:
s.withBufferLock(func() {
s.swapnqueue()
})
close(s.bufs)
<-doneChan
errChan <- s.sender.Close()
return
}
}
}
// send to remove endpoint and truncate buffer
func (s *BufferedSender) flush(b *bytes.Buffer) (int, error) {
bb := b.Bytes()
bbl := len(bb)
if bb[bbl-1] == '\n' {
bb = bb[:bbl-1]
}
//n, err := s.sender.Send(bytes.TrimSuffix(b.Bytes(), []byte("\n")))
n, err := s.sender.Send(bb)
b.Truncate(0) // clear the buffer
return n, err
}
// NewBufferedSender returns a new BufferedSender
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
//
// flushInterval is a time.Duration, and specifies the maximum interval for
// packet sending. Note that if you send lots of metrics, you will send more
// often. This is just a maximal threshold.
//
// flushBytes specifies the maximum udp packet size you wish to send. If adding
// a metric would result in a larger packet than flushBytes, the packet will
// first be send, then the new data will be added to the next packet.
func NewBufferedSender(addr string, flushInterval time.Duration, flushBytes int) (Sender, error) {
simpleSender, err := NewSimpleSender(addr)
if err != nil {
return nil, err
}
return NewBufferedSenderWithSender(simpleSender, flushInterval, flushBytes)
}
// NewBufferedSenderWithSender returns a new BufferedSender, wrapping the
// provided sender.
//
// sender is an instance of a statsd.Sender interface. Sender is required.
//
// flushInterval is a time.Duration, and specifies the maximum interval for
// packet sending. Note that if you send lots of metrics, you will send more
// often. This is just a maximal threshold.
//
// flushBytes specifies the maximum udp packet size you wish to send. If adding
// a metric would result in a larger packet than flushBytes, the packet will
// first be send, then the new data will be added to the next packet.
func NewBufferedSenderWithSender(sender Sender, flushInterval time.Duration, flushBytes int) (Sender, error) {
if sender == nil {
return nil, fmt.Errorf("sender may not be nil")
}
bufSender := &BufferedSender{
flushBytes: flushBytes,
flushInterval: flushInterval,
sender: sender,
buffer: senderPool.Get(),
shutdown: make(chan chan error),
}
bufSender.Start()
return bufSender, nil
}
|