1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
|
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
// Package cc implements an interceptor for bandwidth estimation that can be
// used with different BandwidthEstimators.
package cc
import (
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/gcc"
"github.com/pion/rtcp"
)
// Option can be used to set initial options on CC interceptors.
type Option func(*Interceptor) error
// BandwidthEstimatorFactory creates new BandwidthEstimators.
type BandwidthEstimatorFactory func() (BandwidthEstimator, error)
// BandwidthEstimator is the interface that will be returned by a
// NewPeerConnectionCallback and can be used to query current bandwidth
// metrics and add feedback manually.
type BandwidthEstimator interface {
AddStream(*interceptor.StreamInfo, interceptor.RTPWriter) interceptor.RTPWriter
WriteRTCP([]rtcp.Packet, interceptor.Attributes) error
GetTargetBitrate() int
OnTargetBitrateChange(f func(bitrate int))
GetStats() map[string]any
Close() error
}
// NewPeerConnectionCallback returns the BandwidthEstimator for the
// PeerConnection with id.
type NewPeerConnectionCallback func(id string, estimator BandwidthEstimator)
// InterceptorFactory is a factory for CC interceptors.
type InterceptorFactory struct {
opts []Option
bweFactory func() (BandwidthEstimator, error)
addPeerConnection NewPeerConnectionCallback
}
// NewInterceptor returns a new CC interceptor factory.
func NewInterceptor(factory BandwidthEstimatorFactory, opts ...Option) (*InterceptorFactory, error) {
if factory == nil {
factory = func() (BandwidthEstimator, error) {
return gcc.NewSendSideBWE()
}
}
return &InterceptorFactory{
opts: opts,
bweFactory: factory,
addPeerConnection: nil,
}, nil
}
// OnNewPeerConnection sets a callback that is called when a new CC interceptor
// is created.
func (f *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) {
f.addPeerConnection = cb
}
// NewInterceptor returns a new CC interceptor.
func (f *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
bwe, err := f.bweFactory()
if err != nil {
return nil, err
}
interceptorInstance := &Interceptor{
NoOp: interceptor.NoOp{},
estimator: bwe,
feedback: make(chan []rtcp.Packet),
close: make(chan struct{}),
}
for _, opt := range f.opts {
if err := opt(interceptorInstance); err != nil {
return nil, err
}
}
if f.addPeerConnection != nil {
f.addPeerConnection(id, interceptorInstance.estimator)
}
return interceptorInstance, nil
}
// Interceptor implements Google Congestion Control.
type Interceptor struct {
interceptor.NoOp
estimator BandwidthEstimator
feedback chan []rtcp.Packet
close chan struct{}
}
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once
// per sender/receiver, however this might change in the future. The returned
// method will be called once per packet batch.
func (c *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
return interceptor.RTCPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
i, attr, err := reader.Read(b, a)
if err != nil {
return 0, nil, err
}
buf := make([]byte, i)
copy(buf, b[:i])
if attr == nil {
attr = make(interceptor.Attributes)
}
pkts, err := attr.GetRTCPPackets(buf[:i])
if err != nil {
return 0, nil, err
}
if err = c.estimator.WriteRTCP(pkts, attr); err != nil {
return 0, nil, err
}
return i, attr, nil
})
}
// BindLocalStream lets you modify any outgoing RTP packets. It is called once
// for per LocalStream. The returned method will be called once per rtp packet.
func (c *Interceptor) BindLocalStream(
info *interceptor.StreamInfo, writer interceptor.RTPWriter,
) interceptor.RTPWriter {
return c.estimator.AddStream(info, writer)
}
// Close closes the interceptor and the associated bandwidth estimator.
func (c *Interceptor) Close() error {
return c.estimator.Close()
}
|