1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
/*
* Copyright (c) 2017,2018 by Farsight Security, Inc.
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/
package nmsg
import (
"bufio"
"fmt"
"io"
"time"
)
// An Input is a source of NMSG Payloads.
type Input interface {
// Recv() returns the next Nmsg Payload from the input,
// blocking if none is available.
Recv() (*NmsgPayload, error)
// Stats() returns interface statistics
Stats() *InputStatistics
}
// InputStatistics holds useful metrics for input performance.
type InputStatistics struct {
// Count of total container received, including fragments
InputContainers uint64
// Count of total bytes received and processed
InputBytes uint64
// Count of containers marked lost by sequence tracking
LostContainers uint64
// Count of fragment containers received
InputFragments uint64
// Count of fragments expired from cache
ExpiredFragments uint64
// Count of containers dropped due to incomplete fragments
PartialContainers uint64
}
type dataError struct{ error }
func (d *dataError) Error() string { return d.error.Error() }
// IsDataError returns true of the supplied error is an error unpacking
// or decoding the NMSG data rather than an I/O error with the input.
func IsDataError(err error) bool {
_, ok := err.(*dataError)
return ok
}
type input struct {
r io.Reader
n Nmsg
fcache *fragCache
scache *seqCache
stats InputStatistics
}
func (i *input) Stats() *InputStatistics {
res := &InputStatistics{}
*res = i.stats
return res
}
// NewInput constructs an input from the supplied Reader.
// The size parameter sizes the input buffer, and should
// be greater than the maximum anticipated container size
// for datagram inputs.
func NewInput(r io.Reader, size int) Input {
return &input{
r: bufio.NewReaderSize(r, size),
n: Nmsg{},
fcache: newFragmentCache(2 * time.Minute),
scache: newSequenceCache(2 * time.Minute),
}
}
type checksumError struct {
calc, wire uint32
}
func (c *checksumError) Error() string {
return fmt.Sprintf("checksum mismatch: %x != %x", c.calc, c.wire)
}
func (i *input) Recv() (*NmsgPayload, error) {
for len(i.n.Payloads) == 0 {
var c Container
n, err := c.ReadFrom(i.r)
if err != nil {
return nil, err
}
if n == 0 {
return nil, io.EOF
}
i.stats.InputBytes += uint64(n)
if c.NmsgFragment != nil {
i.stats.InputFragments++
var b []byte
if b = i.fcache.Insert(c.NmsgFragment); b == nil {
continue
}
err = c.fromNmsgBytes(b, c.isCompressed, false)
if err != nil {
return nil, &dataError{err}
}
}
i.stats.InputContainers++
i.stats.LostContainers += uint64(i.scache.Update(&c.Nmsg))
i.scache.Expire()
i.n = c.Nmsg
}
ccount, fcount := i.fcache.Expire()
i.stats.PartialContainers += uint64(ccount)
i.stats.ExpiredFragments += uint64(fcount)
p := i.n.Payloads[0]
i.n.Payloads = i.n.Payloads[1:]
var err error
if len(i.n.PayloadCrcs) > 0 {
wire := i.n.PayloadCrcs[0]
calc := nmsgCRC(p.Payload)
if wire != calc {
err = &dataError{&checksumError{calc, wire}}
}
i.n.PayloadCrcs = i.n.PayloadCrcs[1:]
}
return p, err
}
|