1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
package network // import "collectd.org/network"
import (
"context"
"log"
"net"
"collectd.org/api"
)
// ListenAndWrite listens on the provided UDP address, parses the received
// packets and writes them to the provided api.Writer.
// This is a convenience function for a minimally configured server. If you
// need more control, see the "Server" type below.
func ListenAndWrite(ctx context.Context, address string, d api.Writer) error {
srv := &Server{
Addr: address,
Writer: d,
}
return srv.ListenAndWrite(ctx)
}
// Server holds parameters for running a collectd server.
type Server struct {
// UDP connection the server listens on. If Conn is nil, a new server
// connection is opened. The connection is closed by ListenAndWrite
// before returning.
Conn *net.UDPConn
// Address to listen on if Conn is nil. If Addr is empty, too, then the
// "any" interface and the DefaultService will be used.
Addr string
Writer api.Writer // Object used to send incoming ValueLists to.
BufferSize uint16 // Maximum packet size to accept.
PasswordLookup PasswordLookup // User to password lookup.
SecurityLevel SecurityLevel // Minimal required security level.
TypesDB *api.TypesDB // TypesDB for looking up DS names and verify data source types.
// Interface is the name of the interface to use when subscribing to a
// multicast group. Has no effect when using unicast.
Interface string
}
// ListenAndWrite listens on the provided UDP connection (or creates one using
// Addr if Conn is nil), parses the received packets and writes them to the
// provided api.Writer.
func (srv *Server) ListenAndWrite(ctx context.Context) error {
if srv.Conn == nil {
addr := srv.Addr
if addr == "" {
addr = ":" + DefaultService
}
laddr, err := net.ResolveUDPAddr("udp", srv.Addr)
if err != nil {
return err
}
if laddr.IP != nil && laddr.IP.IsMulticast() {
var ifi *net.Interface
if srv.Interface != "" {
if ifi, err = net.InterfaceByName(srv.Interface); err != nil {
return err
}
}
srv.Conn, err = net.ListenMulticastUDP("udp", ifi, laddr)
} else {
srv.Conn, err = net.ListenUDP("udp", laddr)
}
if err != nil {
return err
}
}
if srv.BufferSize <= 0 {
srv.BufferSize = DefaultBufferSize
}
buf := make([]byte, srv.BufferSize)
popts := ParseOpts{
PasswordLookup: srv.PasswordLookup,
SecurityLevel: srv.SecurityLevel,
TypesDB: srv.TypesDB,
}
var ctxErr error
shutdown := make(chan struct{})
go func() {
select {
case <-ctx.Done():
ctxErr = ctx.Err()
// this interrupts the below Conn.Read().
srv.Conn.Close()
return
case <-shutdown:
return
}
}()
for {
n, err := srv.Conn.Read(buf)
if err != nil {
// if ctxErr is non-nil the context got cancelled.
if ctxErr != nil {
srv.Conn = nil
return ctxErr
}
// network error: shutdown the goroutine, close the
// connection and return.
close(shutdown)
srv.Conn.Close()
srv.Conn = nil
return err
}
valueLists, err := Parse(buf[:n], popts)
if err != nil {
log.Printf("error while parsing: %v", err)
continue
}
go dispatch(ctx, valueLists, srv.Writer)
}
}
func dispatch(ctx context.Context, valueLists []*api.ValueList, d api.Writer) {
for _, vl := range valueLists {
if err := d.Write(ctx, vl); err != nil {
log.Printf("error while dispatching: %v", err)
}
}
}
|