1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
|
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"context"
"log/slog"
"math"
"time"
)
// ccReno is the NewReno-based congestion controller defined in RFC 9002.
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7
type ccReno struct {
maxDatagramSize int
// Maximum number of bytes allowed to be in flight.
congestionWindow int
// Sum of size of all packets that contain at least one ack-eliciting
// or PADDING frame (i.e., any non-ACK frame), and have neither been
// acknowledged nor declared lost.
bytesInFlight int
// When the congestion window is below the slow start threshold,
// the controller is in slow start.
slowStartThreshold int
// The time the current recovery period started, or zero when not
// in a recovery period.
recoveryStartTime time.Time
// Accumulated count of bytes acknowledged in congestion avoidance.
congestionPendingAcks int
// When entering a recovery period, we are allowed to send one packet
// before reducing the congestion window. sendOnePacketInRecovery is
// true if we haven't sent that packet yet.
sendOnePacketInRecovery bool
// inRecovery is set when we are in the recovery state.
inRecovery bool
// underutilized is set if the congestion window is underutilized
// due to insufficient application data, flow control limits, or
// anti-amplification limits.
underutilized bool
// ackLastLoss is the sent time of the newest lost packet processed
// in the current batch.
ackLastLoss time.Time
// Data tracking the duration of the most recently handled sequence of
// contiguous lost packets. If this exceeds the persistent congestion duration,
// persistent congestion is declared.
//
// https://www.rfc-editor.org/rfc/rfc9002#section-7.6
persistentCongestion [numberSpaceCount]struct {
start time.Time // send time of first lost packet
end time.Time // send time of last lost packet
next packetNumber // one plus the number of the last lost packet
}
}
func newReno(maxDatagramSize int) *ccReno {
c := &ccReno{
maxDatagramSize: maxDatagramSize,
}
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.2-1
c.congestionWindow = min(10*maxDatagramSize, max(14720, c.minimumCongestionWindow()))
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.3.1-1
c.slowStartThreshold = math.MaxInt
for space := range c.persistentCongestion {
c.persistentCongestion[space].next = -1
}
return c
}
// canSend reports whether the congestion controller permits sending
// a maximum-size datagram at this time.
//
// "An endpoint MUST NOT send a packet if it would cause bytes_in_flight [...]
// to be larger than the congestion window [...]"
// https://www.rfc-editor.org/rfc/rfc9002#section-7-7
//
// For simplicity and efficiency, we don't permit sending undersized datagrams.
func (c *ccReno) canSend() bool {
if c.sendOnePacketInRecovery {
return true
}
return c.bytesInFlight+c.maxDatagramSize <= c.congestionWindow
}
// setUnderutilized indicates that the congestion window is underutilized.
//
// The congestion window is underutilized if bytes in flight is smaller than
// the congestion window and sending is not pacing limited; that is, the
// congestion controller permits sending data, but no data is sent.
//
// https://www.rfc-editor.org/rfc/rfc9002#section-7.8
func (c *ccReno) setUnderutilized(log *slog.Logger, v bool) {
if c.underutilized == v {
return
}
oldState := c.state()
c.underutilized = v
if logEnabled(log, QLogLevelPacket) {
logCongestionStateUpdated(log, oldState, c.state())
}
}
// packetSent indicates that a packet has been sent.
func (c *ccReno) packetSent(now time.Time, log *slog.Logger, space numberSpace, sent *sentPacket) {
if !sent.inFlight {
return
}
c.bytesInFlight += sent.size
if c.sendOnePacketInRecovery {
c.sendOnePacketInRecovery = false
}
}
// Acked and lost packets are processed in batches
// resulting from either a received ACK frame or
// the loss detection timer expiring.
//
// A batch consists of zero or more calls to packetAcked and packetLost,
// followed by a single call to packetBatchEnd.
//
// Acks may be reported in any order, but lost packets must
// be reported in strictly increasing order.
// packetAcked indicates that a packet has been newly acknowledged.
func (c *ccReno) packetAcked(now time.Time, sent *sentPacket) {
if !sent.inFlight {
return
}
c.bytesInFlight -= sent.size
if c.underutilized {
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.8
return
}
if sent.time.Before(c.recoveryStartTime) {
// In recovery, and this packet was sent before we entered recovery.
// (If this packet was sent after we entered recovery, receiving an ack
// for it moves us out of recovery into congestion avoidance.)
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.3.2
return
}
c.congestionPendingAcks += sent.size
}
// packetLost indicates that a packet has been newly marked as lost.
// Lost packets must be reported in increasing order.
func (c *ccReno) packetLost(now time.Time, space numberSpace, sent *sentPacket, rtt *rttState) {
// Record state to check for persistent congestion.
// https://www.rfc-editor.org/rfc/rfc9002#section-7.6
//
// Note that this relies on always receiving loss events in increasing order:
// All packets prior to the one we're examining now have either been
// acknowledged or declared lost.
isValidPersistentCongestionSample := (sent.ackEliciting &&
!rtt.firstSampleTime.IsZero() &&
!sent.time.Before(rtt.firstSampleTime))
if isValidPersistentCongestionSample {
// This packet either extends an existing range of lost packets,
// or starts a new one.
if sent.num != c.persistentCongestion[space].next {
c.persistentCongestion[space].start = sent.time
}
c.persistentCongestion[space].end = sent.time
c.persistentCongestion[space].next = sent.num + 1
} else {
// This packet cannot establish persistent congestion on its own.
// However, if we have an existing range of lost packets,
// this does not break it.
if sent.num == c.persistentCongestion[space].next {
c.persistentCongestion[space].next = sent.num + 1
}
}
if !sent.inFlight {
return
}
c.bytesInFlight -= sent.size
if sent.time.After(c.ackLastLoss) {
c.ackLastLoss = sent.time
}
}
// packetBatchEnd is called at the end of processing a batch of acked or lost packets.
func (c *ccReno) packetBatchEnd(now time.Time, log *slog.Logger, space numberSpace, rtt *rttState, maxAckDelay time.Duration) {
if logEnabled(log, QLogLevelPacket) {
oldState := c.state()
defer func() { logCongestionStateUpdated(log, oldState, c.state()) }()
}
if !c.ackLastLoss.IsZero() && !c.ackLastLoss.Before(c.recoveryStartTime) {
// Enter the recovery state.
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.3.2
c.recoveryStartTime = now
c.slowStartThreshold = c.congestionWindow / 2
c.congestionWindow = max(c.slowStartThreshold, c.minimumCongestionWindow())
c.sendOnePacketInRecovery = true
// Clear congestionPendingAcks to avoid increasing the congestion
// window based on acks in a frame that sends us into recovery.
c.congestionPendingAcks = 0
c.inRecovery = true
} else if c.congestionPendingAcks > 0 {
// We are in slow start or congestion avoidance.
c.inRecovery = false
if c.congestionWindow < c.slowStartThreshold {
// When the congestion window is less than the slow start threshold,
// we are in slow start and increase the window by the number of
// bytes acknowledged.
d := min(c.slowStartThreshold-c.congestionWindow, c.congestionPendingAcks)
c.congestionWindow += d
c.congestionPendingAcks -= d
}
// When the congestion window is at or above the slow start threshold,
// we are in congestion avoidance.
//
// RFC 9002 does not specify an algorithm here. The following is
// the recommended algorithm from RFC 5681, in which we increment
// the window by the maximum datagram size every time the number
// of bytes acknowledged reaches cwnd.
for c.congestionPendingAcks > c.congestionWindow {
c.congestionPendingAcks -= c.congestionWindow
c.congestionWindow += c.maxDatagramSize
}
}
if !c.ackLastLoss.IsZero() {
// Check for persistent congestion.
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.6
//
// "A sender [...] MAY use state for just the packet number space that
// was acknowledged."
// https://www.rfc-editor.org/rfc/rfc9002#section-7.6.2-5
//
// For simplicity, we consider each number space independently.
const persistentCongestionThreshold = 3
d := (rtt.smoothedRTT + max(4*rtt.rttvar, timerGranularity) + maxAckDelay) *
persistentCongestionThreshold
start := c.persistentCongestion[space].start
end := c.persistentCongestion[space].end
if end.Sub(start) >= d {
c.congestionWindow = c.minimumCongestionWindow()
c.recoveryStartTime = time.Time{}
rtt.establishPersistentCongestion()
}
}
c.ackLastLoss = time.Time{}
}
// packetDiscarded indicates that the keys for a packet's space have been discarded.
func (c *ccReno) packetDiscarded(sent *sentPacket) {
// https://www.rfc-editor.org/rfc/rfc9002#section-6.2.2-3
if sent.inFlight {
c.bytesInFlight -= sent.size
}
}
func (c *ccReno) minimumCongestionWindow() int {
// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.2-4
return 2 * c.maxDatagramSize
}
func logCongestionStateUpdated(log *slog.Logger, oldState, newState congestionState) {
if oldState == newState {
return
}
log.LogAttrs(context.Background(), QLogLevelPacket,
"recovery:congestion_state_updated",
slog.String("old", oldState.String()),
slog.String("new", newState.String()),
)
}
type congestionState string
func (s congestionState) String() string { return string(s) }
const (
congestionSlowStart = congestionState("slow_start")
congestionCongestionAvoidance = congestionState("congestion_avoidance")
congestionApplicationLimited = congestionState("application_limited")
congestionRecovery = congestionState("recovery")
)
func (c *ccReno) state() congestionState {
switch {
case c.inRecovery:
return congestionRecovery
case c.underutilized:
return congestionApplicationLimited
case c.congestionWindow < c.slowStartThreshold:
return congestionSlowStart
default:
return congestionCongestionAvoidance
}
}
|