1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
// Copyright (c) 2012-2016 Eli Janssen
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file.
package statsd
import (
"errors"
"fmt"
"net"
"sync"
"time"
)
// ResolvingSimpleSender provides a socket send interface that re-resolves and
// reconnects.
type ResolvingSimpleSender struct {
// underlying connection
conn net.PacketConn
// resolved udp address
addrResolved *net.UDPAddr
// unresolved addr
addrUnresolved string
// interval time
reresolveInterval time.Duration
// lifecycle
mx sync.RWMutex
doneChan chan struct{}
running bool
}
// Send sends the data to the server endpoint.
func (s *ResolvingSimpleSender) Send(data []byte) (int, error) {
s.mx.RLock()
if !s.running {
s.mx.RUnlock()
return 0, fmt.Errorf("ResolvingSimpleSender is not running")
}
// no need for locking here, as the underlying fdNet
// already serialized writes
n, err := s.conn.(*net.UDPConn).WriteToUDP(data, s.addrResolved)
// unlock manually, and early (vs doing a defer) to avoid some overhead
s.mx.RUnlock()
if err != nil {
return 0, err
}
if n == 0 {
return n, errors.New("Wrote no bytes")
}
return n, nil
}
// Close closes the ResolvingSender and cleans up
func (s *ResolvingSimpleSender) Close() error {
// lock to guard against ra reconnection modification
s.mx.Lock()
defer s.mx.Unlock()
if !s.running {
return nil
}
s.running = false
close(s.doneChan)
err := s.conn.Close()
return err
}
func (s *ResolvingSimpleSender) Reconnect() {
// lock to guard against s.running mutation
s.mx.RLock()
if !s.running {
s.mx.RUnlock()
return
}
// get old addr for comparison, then release lock (asap)
oldAddr := s.addrResolved.String()
// done with rlock for now
s.mx.RUnlock()
// ro doesn't change, so no need to lock
addrResolved, err := net.ResolveUDPAddr("udp", s.addrUnresolved)
if err != nil {
// no good new address.. so continue with old address
return
}
if oldAddr == addrResolved.String() {
// got same address.. so continue with old address
return
}
// acquire write lock to both guard against s.running having been mutated in the
// meantime, as well as for safely setting s.ra
s.mx.Lock()
// check running again, just to be sure nothing was terminated in the meantime...
if s.running {
s.addrResolved = addrResolved
}
s.mx.Unlock()
}
// Start Resolving Simple Sender
// Begins ticker and read loop
func (s *ResolvingSimpleSender) Start() {
// write lock to start running
s.mx.Lock()
defer s.mx.Unlock()
if s.running {
return
}
s.running = true
go s.run()
}
func (s *ResolvingSimpleSender) run() {
ticker := time.NewTicker(s.reresolveInterval)
defer ticker.Stop()
for {
select {
case <-s.doneChan:
return
case <-ticker.C:
// reconnect locks/checks running, so no need to do it here
s.Reconnect()
}
}
}
// NewResolvingSimpleSender returns a new ResolvingSimpleSender for
// sending to the supplied addresss.
//
// addr is a string of the format "hostname:port", and must be parsable by
// net.ResolveUDPAddr.
func NewResolvingSimpleSender(addr string, interval time.Duration) (Sender, error) {
conn, err := net.ListenPacket("udp", ":0")
if err != nil {
return nil, err
}
addrResolved, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
conn.Close()
return nil, err
}
sender := &ResolvingSimpleSender{
conn: conn,
addrResolved: addrResolved,
addrUnresolved: addr,
reresolveInterval: interval,
doneChan: make(chan struct{}),
running: false,
}
sender.Start()
return sender, nil
}
|