1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
|
// Copyright 2018 Google, Inc. All rights reserved.
//
// Use of this source code is governed by a BSD-style license
// that can be found in the LICENSE file in the root of the source
// tree.
// Package lcmdefrag contains a defragmenter for LCM messages.
package lcmdefrag
import (
"fmt"
"time"
"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
)
const (
// Packages are cleaned up/removed after no input was received for this
// amount of seconds.
timeout time.Duration = 3 * time.Second
)
type lcmPacket struct {
lastPacket time.Time
done bool
recFrags uint16
totalFrags uint16
frags map[uint16]*layers.LCM
}
// LCMDefragmenter supports defragmentation of LCM messages.
//
// References
//
// https://lcm-proj.github.io/
// https://github.com/lcm-proj/lcm
type LCMDefragmenter struct {
packets map[uint32]*lcmPacket
}
func newLCMPacket(totalFrags uint16) *lcmPacket {
return &lcmPacket{
done: false,
recFrags: 0,
totalFrags: totalFrags,
frags: make(map[uint16]*layers.LCM),
}
}
// NewLCMDefragmenter returns a new LCMDefragmenter.
func NewLCMDefragmenter() *LCMDefragmenter {
return &LCMDefragmenter{
packets: make(map[uint32]*lcmPacket),
}
}
func (lp *lcmPacket) append(in *layers.LCM) {
lp.frags[in.FragmentNumber] = in
lp.recFrags++
lp.lastPacket = time.Now()
}
func (lp *lcmPacket) assemble() (out *layers.LCM, err error) {
var blob []byte
//Extract packets
for i := uint16(0); i < lp.totalFrags; i++ {
fragment, ok := lp.frags[i]
if !ok {
err = fmt.Errorf("Tried to defragment incomplete packet. Waiting "+
"for more potential (unordered) packets... %d", i)
return
}
// For the very first packet, we also want the header.
if i == 0 {
blob = append(blob, fragment.LayerContents()...)
}
// Append the data for each packet.
blob = append(blob, fragment.Payload()...)
}
packet := gopacket.NewPacket(blob, layers.LayerTypeLCM, gopacket.NoCopy)
lcmHdrLayer := packet.Layer(layers.LayerTypeLCM)
out, ok := lcmHdrLayer.(*layers.LCM)
if !ok {
err = fmt.Errorf("Error while decoding the defragmented packet. " +
"Erasing/dropping packet.")
}
lp.done = true
return
}
func (ld *LCMDefragmenter) cleanUp() {
for key, packet := range ld.packets {
if packet.done || time.Now().Sub(packet.lastPacket) > timeout {
delete(ld.packets, key)
}
}
}
// Defrag takes a reference to an LCM packet and processes it.
// In case the packet does not need to be defragmented, it immediately returns
// the as in passed reference. In case in was the last missing fragment, out
// will be the defragmented packet. If in was a fragment, but we are awaiting
// more, out will be set to nil.
// In the case that in was nil, we will just run the internal cleanup of the
// defragmenter that times out packages.
// If an error was encountered during defragmentation, out will also be nil,
// while err will contain further information on the failure.
func (ld *LCMDefragmenter) Defrag(in *layers.LCM) (out *layers.LCM, err error) {
// Timeout old packages and erase error prone ones.
ld.cleanUp()
// For running cleanup only
if in == nil {
return
}
// Quick check if this is acutally a single packet. In that case, just
// return it quickly.
if !in.Fragmented {
out = in
return
}
// Do we need to start a new fragments obj?
if _, ok := ld.packets[in.SequenceNumber]; !ok {
ld.packets[in.SequenceNumber] = newLCMPacket(in.TotalFragments)
}
// Append the packet
ld.packets[in.SequenceNumber].append(in)
// Check if this is the last package of that series
if ld.packets[in.SequenceNumber].recFrags == in.TotalFragments {
out, err = ld.packets[in.SequenceNumber].assemble()
}
return
}
|