1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
package gcc
import (
"sync"
"time"
"github.com/pion/interceptor/internal/cc"
"github.com/pion/logging"
)
// DelayStats contains some internal statistics of the delay based congestion
// controller
type DelayStats struct {
Measurement time.Duration
Estimate time.Duration
Threshold time.Duration
LastReceiveDelta time.Duration
Usage usage
State state
TargetBitrate int
}
type now func() time.Time
type delayController struct {
ackPipe chan<- []cc.Acknowledgment
ackRatePipe chan<- []cc.Acknowledgment
*arrivalGroupAccumulator
*rateController
onUpdateCallback func(DelayStats)
wg sync.WaitGroup
log logging.LeveledLogger
}
type delayControllerConfig struct {
nowFn now
initialBitrate int
minBitrate int
maxBitrate int
}
func newDelayController(c delayControllerConfig) *delayController {
ackPipe := make(chan []cc.Acknowledgment)
ackRatePipe := make(chan []cc.Acknowledgment)
delayController := &delayController{
ackPipe: ackPipe,
ackRatePipe: ackRatePipe,
arrivalGroupAccumulator: nil,
rateController: nil,
onUpdateCallback: nil,
wg: sync.WaitGroup{},
log: logging.NewDefaultLoggerFactory().NewLogger("gcc_delay_controller"),
}
rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, func(ds DelayStats) {
delayController.log.Infof("delaystats: %v", ds)
if delayController.onUpdateCallback != nil {
delayController.onUpdateCallback(ds)
}
})
delayController.rateController = rateController
overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats)
slopeEstimator := newSlopeEstimator(newKalman(), overuseDetector.onDelayStats)
arrivalGroupAccumulator := newArrivalGroupAccumulator()
rc := newRateCalculator(500 * time.Millisecond)
delayController.wg.Add(2)
go func() {
defer delayController.wg.Done()
arrivalGroupAccumulator.run(ackPipe, slopeEstimator.onArrivalGroup)
}()
go func() {
defer delayController.wg.Done()
rc.run(ackRatePipe, rateController.onReceivedRate)
}()
return delayController
}
func (d *delayController) onUpdate(f func(DelayStats)) {
d.onUpdateCallback = f
}
func (d *delayController) updateDelayEstimate(acks []cc.Acknowledgment) {
d.ackPipe <- acks
d.ackRatePipe <- acks
}
func (d *delayController) Close() error {
defer d.wg.Wait()
close(d.ackPipe)
close(d.ackRatePipe)
return nil
}
|