File: delay_based_bwe.go

package info (click to toggle)
golang-github-pion-interceptor 0.1.12-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bookworm-backports, forky, sid, trixie
  • size: 764 kB
  • sloc: makefile: 8
file content (103 lines) | stat: -rw-r--r-- 2,534 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package gcc

import (
	"sync"
	"time"

	"github.com/pion/interceptor/internal/cc"
	"github.com/pion/logging"
)

// DelayStats contains some internal statistics of the delay based congestion
// controller
type DelayStats struct {
	Measurement      time.Duration
	Estimate         time.Duration
	Threshold        time.Duration
	LastReceiveDelta time.Duration

	Usage         usage
	State         state
	TargetBitrate int
}

type now func() time.Time

type delayController struct {
	ackPipe     chan<- []cc.Acknowledgment
	ackRatePipe chan<- []cc.Acknowledgment

	*arrivalGroupAccumulator
	*rateController

	onUpdateCallback func(DelayStats)

	wg sync.WaitGroup

	log logging.LeveledLogger
}

type delayControllerConfig struct {
	nowFn          now
	initialBitrate int
	minBitrate     int
	maxBitrate     int
}

func newDelayController(c delayControllerConfig) *delayController {
	ackPipe := make(chan []cc.Acknowledgment)
	ackRatePipe := make(chan []cc.Acknowledgment)

	delayController := &delayController{
		ackPipe:                 ackPipe,
		ackRatePipe:             ackRatePipe,
		arrivalGroupAccumulator: nil,
		rateController:          nil,
		onUpdateCallback:        nil,
		wg:                      sync.WaitGroup{},
		log:                     logging.NewDefaultLoggerFactory().NewLogger("gcc_delay_controller"),
	}

	rateController := newRateController(c.nowFn, c.initialBitrate, c.minBitrate, c.maxBitrate, func(ds DelayStats) {
		delayController.log.Infof("delaystats: %v", ds)
		if delayController.onUpdateCallback != nil {
			delayController.onUpdateCallback(ds)
		}
	})
	delayController.rateController = rateController
	overuseDetector := newOveruseDetector(newAdaptiveThreshold(), 10*time.Millisecond, rateController.onDelayStats)
	slopeEstimator := newSlopeEstimator(newKalman(), overuseDetector.onDelayStats)
	arrivalGroupAccumulator := newArrivalGroupAccumulator()

	rc := newRateCalculator(500 * time.Millisecond)

	delayController.wg.Add(2)
	go func() {
		defer delayController.wg.Done()
		arrivalGroupAccumulator.run(ackPipe, slopeEstimator.onArrivalGroup)
	}()
	go func() {
		defer delayController.wg.Done()
		rc.run(ackRatePipe, rateController.onReceivedRate)
	}()

	return delayController
}

func (d *delayController) onUpdate(f func(DelayStats)) {
	d.onUpdateCallback = f
}

func (d *delayController) updateDelayEstimate(acks []cc.Acknowledgment) {
	d.ackPipe <- acks
	d.ackRatePipe <- acks
}

func (d *delayController) Close() error {
	defer d.wg.Wait()

	close(d.ackPipe)
	close(d.ackRatePipe)

	return nil
}