File: main.go

package info (click to toggle)
golang-github-pion-interceptor 0.1.12-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, forky, sid, trixie
  • size: 764 kB
  • sloc: makefile: 8
file content (161 lines) | stat: -rw-r--r-- 4,076 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main

import (
	"fmt"
	"log"
	"net"
	"time"

	"github.com/pion/interceptor"
	"github.com/pion/interceptor/pkg/nack"
	"github.com/pion/rtcp"
	"github.com/pion/rtp"
)

const (
	listenPort = 6420
	mtu        = 1500
	ssrc       = 5000
)

func main() {
	go sendRoutine()
	receiveRoutine()
}

func receiveRoutine() {
	serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
	if err != nil {
		panic(err)
	}

	conn, err := net.ListenUDP("udp4", serverAddr)
	if err != nil {
		panic(err)
	}

	// Create NACK Generator
	generatorFactory, err := nack.NewGeneratorInterceptor()
	if err != nil {
		panic(err)
	}

	generator, err := generatorFactory.NewInterceptor("")
	if err != nil {
		panic(err)
	}

	// Create our interceptor chain with just a NACK Generator
	chain := interceptor.NewChain([]interceptor.Interceptor{generator})

	// Create the writer just for a single SSRC stream
	// this is a callback that is fired everytime a RTP packet is ready to be sent
	streamReader := chain.BindRemoteStream(&interceptor.StreamInfo{
		SSRC:         ssrc,
		RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
	}, interceptor.RTPReaderFunc(func(b []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) { return len(b), nil, nil }))

	for rtcpBound, buffer := false, make([]byte, mtu); ; {
		i, addr, err := conn.ReadFrom(buffer)
		if err != nil {
			panic(err)
		}

		log.Println("Received RTP")

		if _, _, err := streamReader.Read(buffer[:i], nil); err != nil {
			panic(err)
		}

		// Set the interceptor wide RTCP Writer
		// this is a callback that is fired everytime a RTCP packet is ready to be sent
		if !rtcpBound {
			chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, _ interceptor.Attributes) (int, error) {
				buf, err := rtcp.Marshal(pkts)
				if err != nil {
					return 0, err
				}

				return conn.WriteTo(buf, addr)
			}))

			rtcpBound = true
		}
	}
}

func sendRoutine() {
	// Dial our UDP listener that we create in receiveRoutine
	serverAddr, err := net.ResolveUDPAddr("udp4", fmt.Sprintf("127.0.0.1:%d", listenPort))
	if err != nil {
		panic(err)
	}

	conn, err := net.DialUDP("udp4", nil, serverAddr)
	if err != nil {
		panic(err)
	}

	// Create NACK Responder
	responderFactory, err := nack.NewResponderInterceptor()
	if err != nil {
		panic(err)
	}

	responder, err := responderFactory.NewInterceptor("")
	if err != nil {
		panic(err)
	}

	// Create our interceptor chain with just a NACK Responder.
	chain := interceptor.NewChain([]interceptor.Interceptor{responder})

	// Set the interceptor wide RTCP Reader
	// this is a handle to send NACKs back into the interceptor.
	rtcpReader := chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(in []byte, _ interceptor.Attributes) (int, interceptor.Attributes, error) {
		return len(in), nil, nil
	}))

	// Create the writer just for a single SSRC stream
	// this is a callback that is fired everytime a RTP packet is ready to be sent
	streamWriter := chain.BindLocalStream(&interceptor.StreamInfo{
		SSRC:         ssrc,
		RTCPFeedback: []interceptor.RTCPFeedback{{Type: "nack", Parameter: ""}},
	}, interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
		headerBuf, err := header.Marshal()
		if err != nil {
			panic(err)
		}

		return conn.Write(append(headerBuf, payload...))
	}))

	// Read RTCP packets sent by receiver and pass into Interceptor
	go func() {
		for rtcpBuf := make([]byte, mtu); ; {
			i, err := conn.Read(rtcpBuf)
			if err != nil {
				panic(err)
			}

			log.Println("Received NACK")

			if _, _, err = rtcpReader.Read(rtcpBuf[:i], nil); err != nil {
				panic(err)
			}
		}
	}()

	for sequenceNumber := uint16(0); ; sequenceNumber++ {
		// Send a RTP packet with a Payload of 0x0, 0x1, 0x2
		if _, err := streamWriter.Write(&rtp.Header{
			Version:        2,
			SSRC:           ssrc,
			SequenceNumber: sequenceNumber,
		}, []byte{0x0, 0x1, 0x2}, nil); err != nil {
			fmt.Println(err)
		}

		time.Sleep(time.Millisecond * 200)
	}
}