1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
|
package main
import (
"fmt"
"log"
"net"
"time"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/nack"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
const (
listenPort = 6420
mtu = 1500
ssrc = 5000
)
func main() {
go sendRoutine()
receiveRoutine()
}
func receiveRoutine() {
serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
if err != nil {
panic(err)
}
conn, err := net.ListenUDP("udp4", serverAddr)
if err != nil {
panic(err)
}
// Create NACK Generator
generatorFactory, err := nack.NewGeneratorInterceptor()
if err != nil {
panic(err)
}
generator, err := generatorFactory.NewInterceptor("")
if err != nil {
panic(err)
}
// Create our interceptor chain with just a NACK Generator
chain := interceptor.NewChain([]interceptor.Interceptor{generator})
// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamReader := chain.BindRemoteStream(&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
}, interceptor.RTPReaderFunc(func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { return len(b), nil, nil }))
for rtcpBound, buffer := false, make([]byte, mtu); ; {
i, addr, err := conn.ReadFrom(buffer)
if err != nil {
panic(err)
}
log.Println("Received RTP")
if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
panic(err)
}
// Set the interceptor wide RTCP Writer
// this is a callback that is fired everytime a RTCP packet is ready to be sent
if !rtcpBound {
chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
buf, err := rtcp.Marshal(pkts)
if err != nil {
return 0, err
}
return conn.WriteTo(buf, addr)
}))
rtcpBound = true
}
}
}
func sendRoutine() {
// Dial our UDP listener that we create in receiveRoutine
serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
if err != nil {
panic(err)
}
conn, err := net.DialUDP("udp4", nil, serverAddr)
if err != nil {
panic(err)
}
// Create NACK Responder
responderFactory, err := nack.NewResponderInterceptor()
if err != nil {
panic(err)
}
responder, err := responderFactory.NewInterceptor("")
if err != nil {
panic(err)
}
// Create our interceptor chain with just a NACK Responder.
chain := interceptor.NewChain([]interceptor.Interceptor{responder})
// Set the interceptor wide RTCP Reader
// this is a handle to send NACKs back into the interceptor.
rtcpReader := chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
return len(in), nil, nil
}))
// Create the writer just for a single SSRC stream
// this is a callback that is fired everytime a RTP packet is ready to be sent
streamWriter := chain.BindLocalStream(&interceptor.StreamInfo{
SSRC: ssrc,
RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
}, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
headerBuf, err := header.Marshal()
if err != nil {
panic(err)
}
return conn.Write(append(headerBuf, payload...))
}))
// Read RTCP packets sent by receiver and pass into Interceptor
go func() {
for rtcpBuf := make([]byte, mtu); ; {
i, err := conn.Read(rtcpBuf)
if err != nil {
panic(err)
}
log.Println("Received NACK")
if _, _, err = rtcpReader.Read(rtcpBuf[:i], nil); err != nil {
panic(err)
}
}
}()
for sequenceNumber := uint16(0); ; sequenceNumber++ {
// Send a RTP packet with a Payload of 0x0, 0x1, 0x2
if _, err := streamWriter.Write(&rtp.Header{
Version: 2,
SSRC: ssrc,
SequenceNumber: sequenceNumber,
}, []byte{0x0, 0x1, 0x2}, nil); err != nil {
fmt.Println(err)
}
time.Sleep(time.Millisecond * 200)
}
}
|