File: interceptor.go

package info (click to toggle)
golang-github-pion-interceptor 0.1.12-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, forky, sid, trixie
  • size: 764 kB
  • sloc: makefile: 8
file content (131 lines) | stat: -rw-r--r-- 3,913 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Package cc implements an interceptor for bandwidth estimation that can be
// used with different BandwidthEstimators.
package cc

import (
	"github.com/pion/interceptor"
	"github.com/pion/interceptor/pkg/gcc"
	"github.com/pion/rtcp"
)

// Option can be used to set initial options on CC interceptors
type Option func(*Interceptor) error

// BandwidthEstimatorFactory creates new BandwidthEstimators
type BandwidthEstimatorFactory func() (BandwidthEstimator, error)

// BandwidthEstimator is the interface that will be returned by a
// NewPeerConnectionCallback and can be used to query current bandwidth
// metrics and add feedback manually.
type BandwidthEstimator interface {
	AddStream(*interceptor.StreamInfo, interceptor.RTPWriter) interceptor.RTPWriter
	WriteRTCP([]rtcp.Packet, interceptor.Attributes) error
	GetTargetBitrate() int
	OnTargetBitrateChange(f func(bitrate int))
	GetStats() map[string]interface{}
	Close() error
}

// NewPeerConnectionCallback returns the BandwidthEstimator for the
// PeerConnection with id
type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator)

// InterceptorFactory is a factory for CC interceptors
type InterceptorFactory struct {
	opts              []Option
	bweFactory        func() (BandwidthEstimator, error)
	addPeerConnection NewPeerConnectionCallback
}

// NewInterceptor returns a new CC interceptor factory
func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*InterceptorFactory, error) {
	if factory == nil {
		factory = func() (BandwidthEstimator, error) {
			return gcc.NewSendSideBWE()
		}
	}
	return &InterceptorFactory{
		opts:              opts,
		bweFactory:        factory,
		addPeerConnection: nil,
	}, nil
}

// OnNewPeerConnection sets a callback that is called when a new CC interceptor
// is created.
func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) {
	f.addPeerConnection = cb
}

// NewInterceptor returns a new CC interceptor
func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
	bwe, err := f.bweFactory()
	if err != nil {
		return nil, err
	}
	i := &Interceptor{
		NoOp:      interceptor.NoOp{},
		estimator: bwe,
		feedback:  make(chan []rtcp.Packet),
		close:     make(chan struct{}),
	}

	for _, opt := range f.opts {
		if err := opt(i); err != nil {
			return nil, err
		}
	}

	if f.addPeerConnection != nil {
		f.addPeerConnection(id, i.estimator)
	}
	return i, nil
}

// Interceptor implements Google Congestion Control
type Interceptor struct {
	interceptor.NoOp
	estimator BandwidthEstimator
	feedback  chan []rtcp.Packet
	close     chan struct{}
}

// BindRTCPReader lets you modify any incoming RTCP packets. It is called once
// per sender/receiver, however this might change in the future. The returned
// method will be called once per packet batch.
func (c *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
	return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
		i, attr, err := reader.Read(b, a)
		if err != nil {
			return 0, nil, err
		}
		buf := make([]byte, i)

		copy(buf, b[:i])

		if attr == nil {
			attr = make(interceptor.Attributes)
		}

		pkts, err := attr.GetRTCPPackets(buf[:i])
		if err != nil {
			return 0, nil, err
		}
		if err = c.estimator.WriteRTCP(pkts, attr); err != nil {
			return 0, nil, err
		}

		return i, attr, nil
	})
}

// BindLocalStream lets you modify any outgoing RTP packets. It is called once
// for per LocalStream. The returned method will be called once per rtp packet.
func (c *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
	return c.estimator.AddStream(info, writer)
}

// Close closes the interceptor and the associated bandwidth estimator.
func (c *Interceptor) Close() error {
	return c.estimator.Close()
}