File: interceptor.go

package info (click to toggle)
golang-github-pion-interceptor 0.1.42-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,052 kB
  • sloc: makefile: 8
file content (138 lines) | stat: -rw-r--r-- 4,092 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

// Package cc implements an interceptor for bandwidth estimation that can be
// used with different BandwidthEstimators.
package cc

import (
	"github.com/pion/interceptor"
	"github.com/pion/interceptor/pkg/gcc"
	"github.com/pion/rtcp"
)

// Option can be used to set initial options on CC interceptors.
type Option func(*Interceptor) error

// BandwidthEstimatorFactory creates new BandwidthEstimators.
type BandwidthEstimatorFactory func() (BandwidthEstimator, error)

// BandwidthEstimator is the interface that will be returned by a
// NewPeerConnectionCallback and can be used to query current bandwidth
// metrics and add feedback manually.
type BandwidthEstimator interface {
	AddStream(*interceptor.StreamInfo, interceptor.RTPWriter) interceptor.RTPWriter
	WriteRTCP([]rtcp.Packet, interceptor.Attributes) error
	GetTargetBitrate() int
	OnTargetBitrateChange(f func(bitrate int))
	GetStats() map[string]any
	Close() error
}

// NewPeerConnectionCallback returns the BandwidthEstimator for the
// PeerConnection with id.
type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator)

// InterceptorFactory is a factory for CC interceptors.
type InterceptorFactory struct {
	opts              []Option
	bweFactory        func() (BandwidthEstimator, error)
	addPeerConnection NewPeerConnectionCallback
}

// NewInterceptor returns a new CC interceptor factory.
func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*InterceptorFactory, error) {
	if factory == nil {
		factory = func() (BandwidthEstimator, error) {
			return gcc.NewSendSideBWE()
		}
	}

	return &InterceptorFactory{
		opts:              opts,
		bweFactory:        factory,
		addPeerConnection: nil,
	}, nil
}

// OnNewPeerConnection sets a callback that is called when a new CC interceptor
// is created.
func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) {
	f.addPeerConnection = cb
}

// NewInterceptor returns a new CC interceptor.
func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
	bwe, err := f.bweFactory()
	if err != nil {
		return nil, err
	}
	interceptorInstance := &Interceptor{
		NoOp:      interceptor.NoOp{},
		estimator: bwe,
		feedback:  make(chan []rtcp.Packet),
		close:     make(chan struct{}),
	}

	for _, opt := range f.opts {
		if err := opt(interceptorInstance); err != nil {
			return nil, err
		}
	}

	if f.addPeerConnection != nil {
		f.addPeerConnection(id, interceptorInstance.estimator)
	}

	return interceptorInstance, nil
}

// Interceptor implements Google Congestion Control.
type Interceptor struct {
	interceptor.NoOp
	estimator BandwidthEstimator
	feedback  chan []rtcp.Packet
	close     chan struct{}
}

// BindRTCPReader lets you modify any incoming RTCP packets. It is called once
// per sender/receiver, however this might change in the future. The returned
// method will be called once per packet batch.
func (c *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
	return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
		i, attr, err := reader.Read(b, a)
		if err != nil {
			return 0, nil, err
		}
		buf := make([]byte, i)

		copy(buf, b[:i])

		if attr == nil {
			attr = make(interceptor.Attributes)
		}

		pkts, err := attr.GetRTCPPackets(buf[:i])
		if err != nil {
			return 0, nil, err
		}
		if err = c.estimator.WriteRTCP(pkts, attr); err != nil {
			return 0, nil, err
		}

		return i, attr, nil
	})
}

// BindLocalStream lets you modify any outgoing RTP packets. It is called once
// for per LocalStream. The returned method will be called once per rtp packet.
func (c *Interceptor) BindLocalStream(
	info *interceptor.StreamInfo, writer interceptor.RTPWriter,
) interceptor.RTPWriter {
	return c.estimator.AddStream(info, writer)
}

// Close closes the interceptor and the associated bandwidth estimator.
func (c *Interceptor) Close() error {
	return c.estimator.Close()
}