File: input.go

package info (click to toggle)
golang-github-farsightsec-go-nmsg 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental, forky, sid, trixie
  • size: 500 kB
  • sloc: sh: 21; makefile: 3
file content (136 lines) | stat: -rw-r--r-- 3,269 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
 * Copyright (c) 2017,2018 by Farsight Security, Inc.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

package nmsg

import (
	"bufio"
	"fmt"
	"io"
	"time"
)

// An Input is a source of NMSG Payloads.
type Input interface {
	// Recv() returns the next Nmsg Payload from the input,
	// blocking if none is available.
	Recv() (*NmsgPayload, error)
	// Stats() returns interface statistics
	Stats() *InputStatistics
}

// InputStatistics holds useful metrics for input performance.
type InputStatistics struct {
	// Count of total container received, including fragments
	InputContainers uint64
	// Count of total bytes received and processed
	InputBytes uint64
	// Count of containers marked lost by sequence tracking
	LostContainers uint64
	// Count of fragment containers received
	InputFragments uint64
	// Count of fragments expired from cache
	ExpiredFragments uint64
	// Count of containers dropped due to incomplete fragments
	PartialContainers uint64
}

type dataError struct{ error }

func (d *dataError) Error() string { return d.error.Error() }

// IsDataError returns true of the supplied error is an error unpacking
// or decoding the NMSG data rather than an I/O error with the input.
func IsDataError(err error) bool {
	_, ok := err.(*dataError)
	return ok
}

type input struct {
	r      io.Reader
	n      Nmsg
	fcache *fragCache
	scache *seqCache
	stats  InputStatistics
}

func (i *input) Stats() *InputStatistics {
	res := &InputStatistics{}
	*res = i.stats
	return res
}

// NewInput constructs an input from the supplied Reader.
// The size parameter sizes the input buffer, and should
// be greater than the maximum anticipated container size
// for datagram inputs.
func NewInput(r io.Reader, size int) Input {
	return &input{
		r:      bufio.NewReaderSize(r, size),
		n:      Nmsg{},
		fcache: newFragmentCache(2 * time.Minute),
		scache: newSequenceCache(2 * time.Minute),
	}
}

type checksumError struct {
	calc, wire uint32
}

func (c *checksumError) Error() string {
	return fmt.Sprintf("checksum mismatch: %x != %x", c.calc, c.wire)
}

func (i *input) Recv() (*NmsgPayload, error) {
	for len(i.n.Payloads) == 0 {
		var c Container
		n, err := c.ReadFrom(i.r)
		if err != nil {
			return nil, err
		}
		if n == 0 {
			return nil, io.EOF
		}

		i.stats.InputBytes += uint64(n)

		if c.NmsgFragment != nil {
			i.stats.InputFragments++
			var b []byte
			if b = i.fcache.Insert(c.NmsgFragment); b == nil {
				continue
			}
			err = c.fromNmsgBytes(b, c.isCompressed, false)
			if err != nil {
				return nil, &dataError{err}
			}
		}

		i.stats.InputContainers++
		i.stats.LostContainers += uint64(i.scache.Update(&c.Nmsg))
		i.scache.Expire()
		i.n = c.Nmsg
	}
	ccount, fcount := i.fcache.Expire()
	i.stats.PartialContainers += uint64(ccount)
	i.stats.ExpiredFragments += uint64(fcount)
	p := i.n.Payloads[0]
	i.n.Payloads = i.n.Payloads[1:]

	var err error
	if len(i.n.PayloadCrcs) > 0 {
		wire := i.n.PayloadCrcs[0]
		calc := nmsgCRC(p.Payload)
		if wire != calc {
			err = &dataError{&checksumError{calc, wire}}
		}
		i.n.PayloadCrcs = i.n.PayloadCrcs[1:]
	}

	return p, err
}