File: server.go

package info (click to toggle)
golang-collectd 0.3.0%2Bgit20181025.f80706d-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 312 kB
  • sloc: makefile: 3
file content (132 lines) | stat: -rw-r--r-- 3,434 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package network // import "collectd.org/network"

import (
	"context"
	"log"
	"net"

	"collectd.org/api"
)

// ListenAndWrite listens on the provided UDP address, parses the received
// packets and writes them to the provided api.Writer.
// This is a convenience function for a minimally configured server. If you
// need more control, see the "Server" type below.
func ListenAndWrite(ctx context.Context, address string, d api.Writer) error {
	srv := &Server{
		Addr:   address,
		Writer: d,
	}

	return srv.ListenAndWrite(ctx)
}

// Server holds parameters for running a collectd server.
type Server struct {
	// UDP connection the server listens on. If Conn is nil, a new server
	// connection is opened. The connection is closed by ListenAndWrite
	// before returning.
	Conn *net.UDPConn
	// Address to listen on if Conn is nil. If Addr is empty, too, then the
	// "any" interface and the DefaultService will be used.
	Addr           string
	Writer         api.Writer     // Object used to send incoming ValueLists to.
	BufferSize     uint16         // Maximum packet size to accept.
	PasswordLookup PasswordLookup // User to password lookup.
	SecurityLevel  SecurityLevel  // Minimal required security level.
	TypesDB        *api.TypesDB   // TypesDB for looking up DS names and verify data source types.
	// Interface is the name of the interface to use when subscribing to a
	// multicast group. Has no effect when using unicast.
	Interface string
}

// ListenAndWrite listens on the provided UDP connection (or creates one using
// Addr if Conn is nil), parses the received packets and writes them to the
// provided api.Writer.
func (srv *Server) ListenAndWrite(ctx context.Context) error {
	if srv.Conn == nil {
		addr := srv.Addr
		if addr == "" {
			addr = ":" + DefaultService
		}

		laddr, err := net.ResolveUDPAddr("udp", srv.Addr)
		if err != nil {
			return err
		}

		if laddr.IP != nil && laddr.IP.IsMulticast() {
			var ifi *net.Interface
			if srv.Interface != "" {
				if ifi, err = net.InterfaceByName(srv.Interface); err != nil {
					return err
				}
			}
			srv.Conn, err = net.ListenMulticastUDP("udp", ifi, laddr)
		} else {
			srv.Conn, err = net.ListenUDP("udp", laddr)
		}
		if err != nil {
			return err
		}
	}

	if srv.BufferSize <= 0 {
		srv.BufferSize = DefaultBufferSize
	}
	buf := make([]byte, srv.BufferSize)

	popts := ParseOpts{
		PasswordLookup: srv.PasswordLookup,
		SecurityLevel:  srv.SecurityLevel,
		TypesDB:        srv.TypesDB,
	}

	var ctxErr error
	shutdown := make(chan struct{})
	go func() {
		select {
		case <-ctx.Done():
			ctxErr = ctx.Err()
			// this interrupts the below Conn.Read().
			srv.Conn.Close()
			return
		case <-shutdown:
			return
		}
	}()

	for {
		n, err := srv.Conn.Read(buf)
		if err != nil {
			// if ctxErr is non-nil the context got cancelled.
			if ctxErr != nil {
				srv.Conn = nil
				return ctxErr
			}

			// network error: shutdown the goroutine, close the
			// connection and return.
			close(shutdown)
			srv.Conn.Close()
			srv.Conn = nil
			return err
		}

		valueLists, err := Parse(buf[:n], popts)
		if err != nil {
			log.Printf("error while parsing: %v", err)
			continue
		}

		go dispatch(ctx, valueLists, srv.Writer)
	}
}

func dispatch(ctx context.Context, valueLists []*api.ValueList, d api.Writer) {
	for _, vl := range valueLists {
		if err := d.Write(ctx, vl); err != nil {
			log.Printf("error while dispatching: %v", err)
		}
	}
}