1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
// Package stats provides an interceptor that records RTP/RTCP stream statistics
package stats
import (
"sync"
"time"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/rtp"
)
// Option can be used to configure the stats interceptor
type Option func(*Interceptor) error
// SetRecorderFactory sets the factory that is used to create new stats
// recorders for new streams
func SetRecorderFactory(f RecorderFactory) Option {
return func(i *Interceptor) error {
i.RecorderFactory = f
return nil
}
}
// SetNowFunc sets the function the interceptor uses to get a current timestamp.
// This is mostly useful for testing.
func SetNowFunc(now func() time.Time) Option {
return func(i *Interceptor) error {
i.now = now
return nil
}
}
// Getter returns the most recent stats of a stream
type Getter interface {
Get(ssrc uint32) *Stats
}
// NewPeerConnectionCallback receives a new StatsGetter for a newly created
// PeerConnection
type NewPeerConnectionCallback func(string, Getter)
// InterceptorFactory is a interceptor.Factory for a stats Interceptor
type InterceptorFactory struct {
opts []Option
addPeerConnection NewPeerConnectionCallback
}
// NewInterceptor creates a new InterceptorFactory
func NewInterceptor(opts ...Option) (*InterceptorFactory, error) {
return &InterceptorFactory{
opts: opts,
addPeerConnection: nil,
}, nil
}
// OnNewPeerConnection sets the callback that is called when a new
// PeerConnection is created.
func (r *InterceptorFactory) OnNewPeerConnection(cb NewPeerConnectionCallback) {
r.addPeerConnection = cb
}
// NewInterceptor creates a new Interceptor
func (r *InterceptorFactory) NewInterceptor(id string) (interceptor.Interceptor, error) {
i := &Interceptor{
NoOp: interceptor.NoOp{},
now: time.Now,
lock: sync.Mutex{},
RecorderFactory: func(ssrc uint32, clockRate float64) Recorder {
return newRecorder(ssrc, clockRate)
},
recorders: map[uint32]Recorder{},
wg: sync.WaitGroup{},
}
for _, opt := range r.opts {
if err := opt(i); err != nil {
return nil, err
}
}
if r.addPeerConnection != nil {
r.addPeerConnection(id, i)
}
return i, nil
}
// Recorder is the interface of a statistics recorder
type Recorder interface {
QueueIncomingRTP(ts time.Time, buf []byte, attr interceptor.Attributes)
QueueIncomingRTCP(ts time.Time, buf []byte, attr interceptor.Attributes)
QueueOutgoingRTP(ts time.Time, header *rtp.Header, payload []byte, attr interceptor.Attributes)
QueueOutgoingRTCP(ts time.Time, pkts []rtcp.Packet, attr interceptor.Attributes)
GetStats() Stats
Stop()
Start()
}
// RecorderFactory creates new Recorders to be used by the interceptor
type RecorderFactory func(ssrc uint32, clockRate float64) Recorder
// Interceptor is the interceptor that collects stream stats
type Interceptor struct {
interceptor.NoOp
now func() time.Time
lock sync.Mutex
RecorderFactory RecorderFactory
recorders map[uint32]Recorder
wg sync.WaitGroup
}
// Get returns the statistics for the stream with ssrc
func (r *Interceptor) Get(ssrc uint32) *Stats {
r.lock.Lock()
defer r.lock.Unlock()
if rec, ok := r.recorders[ssrc]; ok {
stats := rec.GetStats()
return &stats
}
return nil
}
func (r *Interceptor) getRecorder(ssrc uint32, clockRate float64) Recorder {
r.lock.Lock()
defer r.lock.Unlock()
if rec, ok := r.recorders[ssrc]; ok {
return rec
}
rec := r.RecorderFactory(ssrc, clockRate)
r.wg.Add(1)
go func() {
defer r.wg.Done()
rec.Start()
}()
r.recorders[ssrc] = rec
return rec
}
// Close closes the interceptor and associated stats recorders
func (r *Interceptor) Close() error {
defer r.wg.Wait()
r.lock.Lock()
defer r.lock.Unlock()
for _, r := range r.recorders {
r.Stop()
}
return nil
}
// BindRTCPReader lets you modify any incoming RTCP packets. It is called once per sender/receiver, however this might
// change in the future. The returned method will be called once per packet batch.
func (r *Interceptor) BindRTCPReader(reader interceptor.RTCPReader) interceptor.RTCPReader {
return interceptor.RTCPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
n, attattributes, err := reader.Read(bytes, attributes)
if err != nil {
return 0, attattributes, err
}
r.lock.Lock()
for _, recorder := range r.recorders {
recorder.QueueIncomingRTCP(r.now(), bytes[:n], attributes)
}
r.lock.Unlock()
return n, attattributes, err
})
}
// BindRTCPWriter lets you modify any outgoing RTCP packets. It is called once per PeerConnection. The returned method
// will be called once per packet batch.
func (r *Interceptor) BindRTCPWriter(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
return interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
r.lock.Lock()
for _, recorder := range r.recorders {
recorder.QueueOutgoingRTCP(r.now(), pkts, attributes)
}
r.lock.Unlock()
return writer.Write(pkts, attributes)
})
}
// BindLocalStream lets you modify any outgoing RTP packets. It is called once for per LocalStream. The returned method
// will be called once per rtp packet.
func (r *Interceptor) BindLocalStream(info *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
recorder := r.getRecorder(info.SSRC, float64(info.ClockRate))
return interceptor.RTPWriterFunc(func(header *rtp.Header, payload []byte, attributes interceptor.Attributes) (int, error) {
recorder.QueueOutgoingRTP(r.now(), header, payload, attributes)
return writer.Write(header, payload, attributes)
})
}
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (r *Interceptor) BindRemoteStream(info *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
recorder := r.getRecorder(info.SSRC, float64(info.ClockRate))
return interceptor.RTPReaderFunc(func(bytes []byte, attributes interceptor.Attributes) (int, interceptor.Attributes, error) {
n, attributes, err := reader.Read(bytes, attributes)
if err != nil {
return 0, nil, err
}
recorder.QueueIncomingRTP(r.now(), bytes[:n], attributes)
return n, attributes, nil
})
}
|