1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
package packetdump
import (
"fmt"
"io"
"os"
"sync"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
// PacketDumper dumps packet to a io.Writer
type PacketDumper struct {
log logging.LeveledLogger
wg sync.WaitGroup
close chan struct{}
rtpChan chan *rtpDump
rtcpChan chan *rtcpDump
rtpStream io.Writer
rtcpStream io.Writer
rtpFormat RTPFormatCallback
rtcpFormat RTCPFormatCallback
rtpFilter RTPFilterCallback
rtcpFilter RTCPFilterCallback
}
// NewPacketDumper creates a new PacketDumper
func NewPacketDumper(opts ...PacketDumperOption) (*PacketDumper, error) {
d := &PacketDumper{
log: logging.NewDefaultLoggerFactory().NewLogger("packet_dumper"),
wg: sync.WaitGroup{},
close: make(chan struct{}),
rtpChan: make(chan *rtpDump),
rtcpChan: make(chan *rtcpDump),
rtpStream: os.Stdout,
rtcpStream: os.Stdout,
rtpFormat: DefaultRTPFormatter,
rtcpFormat: DefaultRTCPFormatter,
rtpFilter: func(pkt *rtp.Packet) bool {
return true
},
rtcpFilter: func(pkt []rtcp.Packet) bool {
return true
},
}
for _, opt := range opts {
if err := opt(d); err != nil {
return nil, err
}
}
d.wg.Add(1)
go d.loop()
return d, nil
}
func (d *PacketDumper) logRTPPacket(header *rtp.Header, payload []byte, attributes interceptor.Attributes) {
select {
case d.rtpChan <- &rtpDump{
attributes: attributes,
packet: &rtp.Packet{
Header: *header,
Payload: payload,
},
}:
case <-d.close:
}
}
func (d *PacketDumper) logRTCPPackets(pkts []rtcp.Packet, attributes interceptor.Attributes) {
select {
case d.rtcpChan <- &rtcpDump{
attributes: attributes,
packets: pkts,
}:
case <-d.close:
}
}
// Close closes the PacketDumper
func (d *PacketDumper) Close() error {
defer d.wg.Wait()
if !d.isClosed() {
close(d.close)
}
return nil
}
func (d *PacketDumper) isClosed() bool {
select {
case <-d.close:
return true
default:
return false
}
}
func (d *PacketDumper) loop() {
defer d.wg.Done()
for {
select {
case <-d.close:
return
case dump := <-d.rtpChan:
if d.rtpFilter(dump.packet) {
if _, err := fmt.Fprint(d.rtpStream, d.rtpFormat(dump.packet, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTP packet %v", err)
}
}
case dump := <-d.rtcpChan:
if d.rtcpFilter(dump.packets) {
if _, err := fmt.Fprint(d.rtcpStream, d.rtcpFormat(dump.packets, dump.attributes)); err != nil {
d.log.Errorf("could not dump RTCP packet %v", err)
}
}
}
}
}
|