File: server.go

package info (click to toggle)
golang-collectd 0.5.0%2Bgit20240104.26404de-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 468 kB
  • sloc: ansic: 324; sh: 7; makefile: 3
file content (129 lines) | stat: -rw-r--r-- 3,280 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package network // import "collectd.org/network"

import (
	"context"
	"log"
	"net"
	"sync"

	"collectd.org/api"
)

// ListenAndWrite listens on the provided UDP address, parses the received
// packets and writes them to the provided api.Writer.
// This is a convenience function for a minimally configured server. If you
// need more control, see the "Server" type below.
func ListenAndWrite(ctx context.Context, address string, d api.Writer) error {
	srv := &Server{
		Addr:   address,
		Writer: d,
	}

	return srv.ListenAndWrite(ctx)
}

// Server holds parameters for running a collectd server.
type Server struct {
	// UDP connection the server listens on. If Conn is nil, a new server
	// connection is opened. The connection is closed by ListenAndWrite
	// before returning.
	Conn *net.UDPConn
	// Address to listen on if Conn is nil. If Addr is empty, too, then the
	// "any" interface and the DefaultService will be used.
	Addr           string
	Writer         api.Writer     // Object used to send incoming ValueLists to.
	BufferSize     uint16         // Maximum packet size to accept.
	PasswordLookup PasswordLookup // User to password lookup.
	SecurityLevel  SecurityLevel  // Minimal required security level.
	TypesDB        *api.TypesDB   // TypesDB for looking up DS names and verify data source types.
	// Interface is the name of the interface to use when subscribing to a
	// multicast group. Has no effect when using unicast.
	Interface string
}

// ListenAndWrite listens on the provided UDP connection (or creates one using
// Addr if Conn is nil), parses the received packets and writes them to the
// provided api.Writer.
func (srv *Server) ListenAndWrite(ctx context.Context) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	if srv.Conn == nil {
		addr := srv.Addr
		if addr == "" {
			addr = ":" + DefaultService
		}

		laddr, err := net.ResolveUDPAddr("udp", addr)
		if err != nil {
			return err
		}

		if laddr.IP != nil && laddr.IP.IsMulticast() {
			var ifi *net.Interface
			if srv.Interface != "" {
				if ifi, err = net.InterfaceByName(srv.Interface); err != nil {
					return err
				}
			}
			srv.Conn, err = net.ListenMulticastUDP("udp", ifi, laddr)
		} else {
			srv.Conn, err = net.ListenUDP("udp", laddr)
		}
		if err != nil {
			return err
		}
	}

	if srv.BufferSize <= 0 {
		srv.BufferSize = DefaultBufferSize
	}

	popts := ParseOpts{
		PasswordLookup: srv.PasswordLookup,
		SecurityLevel:  srv.SecurityLevel,
		TypesDB:        srv.TypesDB,
	}

	go func() {
		select {
		case <-ctx.Done():
			// this interrupts the below Conn.Read().
			srv.Conn.Close()
		}
	}()

	var wg sync.WaitGroup
	for {
		buf := make([]byte, srv.BufferSize)
		n, err := srv.Conn.Read(buf)
		if err != nil {
			srv.Conn.Close()
			wg.Wait()
			if ctx.Err() != nil {
				return ctx.Err()
			}
			return err
		}

		valueLists, err := Parse(buf[:n], popts)
		if err != nil {
			log.Printf("error while parsing: %v", err)
			continue
		}

		wg.Add(1)
		go func() {
			defer wg.Done()
			dispatch(ctx, valueLists, srv.Writer)
		}()
	}
}

func dispatch(ctx context.Context, valueLists []*api.ValueList, d api.Writer) {
	for _, vl := range valueLists {
		if err := d.Write(ctx, vl); err != nil {
			log.Printf("error while dispatching: %v", err)
		}
	}
}