1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294
|
package flowcontrol
import (
"testing"
"time"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/qerr"
"github.com/quic-go/quic-go/internal/utils"
"github.com/stretchr/testify/require"
)
func TestStreamFlowControlReceiving(t *testing.T) {
fc := NewStreamFlowController(
42,
NewConnectionFlowController(
protocol.MaxByteCount,
protocol.MaxByteCount,
nil,
&utils.RTTStats{},
utils.DefaultLogger,
),
100,
protocol.MaxByteCount,
protocol.MaxByteCount,
&utils.RTTStats{},
utils.DefaultLogger,
)
require.NoError(t, fc.UpdateHighestReceived(50, false, time.Now()))
// duplicates are fine
require.NoError(t, fc.UpdateHighestReceived(50, false, time.Now()))
// reordering is fine
require.NoError(t, fc.UpdateHighestReceived(40, false, time.Now()))
require.NoError(t, fc.UpdateHighestReceived(60, false, time.Now()))
// exceeding the limit is not fine
err := fc.UpdateHighestReceived(101, false, time.Now())
var terr *qerr.TransportError
require.ErrorAs(t, err, &terr)
require.Equal(t, qerr.FlowControlError, terr.ErrorCode)
require.Equal(t, "received 101 bytes on stream 42, allowed 100 bytes", terr.ErrorMessage)
}
func TestStreamFlowControllerFinalOffset(t *testing.T) {
newFC := func() StreamFlowController {
return NewStreamFlowController(
42,
NewConnectionFlowController(
protocol.MaxByteCount,
protocol.MaxByteCount,
nil,
&utils.RTTStats{},
utils.DefaultLogger,
),
protocol.MaxByteCount,
protocol.MaxByteCount,
protocol.MaxByteCount,
&utils.RTTStats{},
utils.DefaultLogger,
)
}
t.Run("duplicate final offset", func(t *testing.T) {
fc := newFC()
require.NoError(t, fc.UpdateHighestReceived(50, true, time.Now()))
// it is valid to receive the same final offset multiple times
require.NoError(t, fc.UpdateHighestReceived(50, true, time.Now()))
})
t.Run("inconsistent final offset", func(t *testing.T) {
fc := newFC()
require.NoError(t, fc.UpdateHighestReceived(50, true, time.Now()))
err := fc.UpdateHighestReceived(51, true, time.Now())
require.Error(t, err)
var terr *qerr.TransportError
require.ErrorAs(t, err, &terr)
require.Equal(t, qerr.FinalSizeError, terr.ErrorCode)
require.Equal(t, "received inconsistent final offset for stream 42 (old: 50, new: 51 bytes)", terr.ErrorMessage)
})
t.Run("non-final offset past final offset", func(t *testing.T) {
fc := newFC()
require.NoError(t, fc.UpdateHighestReceived(50, true, time.Now()))
// No matter the ordering, it's never ok to receive an offset past the final offset.
err := fc.UpdateHighestReceived(60, false, time.Now())
var terr *qerr.TransportError
require.ErrorAs(t, err, &terr)
require.Equal(t, qerr.FinalSizeError, terr.ErrorCode)
require.Equal(t, "received offset 60 for stream 42, but final offset was already received at 50", terr.ErrorMessage)
})
t.Run("final offset smaller than previous offset", func(t *testing.T) {
fc := newFC()
require.NoError(t, fc.UpdateHighestReceived(50, false, time.Now()))
// If we received offset already, it's invalid to receive a smaller final offset.
err := fc.UpdateHighestReceived(40, true, time.Now())
var terr *qerr.TransportError
require.ErrorAs(t, err, &terr)
require.Equal(t, qerr.FinalSizeError, terr.ErrorCode)
require.Equal(t, "received final offset 40 for stream 42, but already received offset 50 before", terr.ErrorMessage)
})
}
func TestStreamAbandoning(t *testing.T) {
connFC := NewConnectionFlowController(
100,
protocol.MaxByteCount,
nil,
&utils.RTTStats{},
utils.DefaultLogger,
)
require.True(t, connFC.UpdateSendWindow(300))
fc := NewStreamFlowController(
42,
connFC,
60,
protocol.MaxByteCount,
100,
&utils.RTTStats{},
utils.DefaultLogger,
)
require.NoError(t, fc.UpdateHighestReceived(50, true, time.Now()))
require.Zero(t, fc.GetWindowUpdate(time.Now()))
require.Zero(t, connFC.GetWindowUpdate(time.Now()))
// Abandon the stream.
// This marks all bytes as having been consumed.
fc.Abandon()
require.Equal(t, protocol.ByteCount(150), connFC.GetWindowUpdate(time.Now()))
}
func TestStreamSendWindow(t *testing.T) {
// We set up the connection flow controller with a limit of 300 bytes,
// and the stream flow controller with a limit of 100 bytes.
connFC := NewConnectionFlowController(
protocol.MaxByteCount,
protocol.MaxByteCount,
nil,
&utils.RTTStats{},
utils.DefaultLogger,
)
require.True(t, connFC.UpdateSendWindow(300))
fc := NewStreamFlowController(
42,
connFC,
protocol.MaxByteCount,
protocol.MaxByteCount,
100,
&utils.RTTStats{},
utils.DefaultLogger,
)
// first, we're limited by the stream flow controller
require.Equal(t, protocol.ByteCount(100), fc.SendWindowSize())
fc.AddBytesSent(50)
require.False(t, fc.IsNewlyBlocked())
require.Equal(t, protocol.ByteCount(50), fc.SendWindowSize())
fc.AddBytesSent(50)
require.True(t, fc.IsNewlyBlocked())
require.Zero(t, fc.SendWindowSize())
require.False(t, fc.IsNewlyBlocked()) // we're still blocked, but it's not new
// Update the stream flow control limit, but don't update the connection flow control limit.
// We're now limited by the connection flow controller.
require.True(t, fc.UpdateSendWindow(1000))
// reordered updates are ignored
require.False(t, fc.UpdateSendWindow(999))
require.False(t, fc.IsNewlyBlocked()) // we're not blocked anymore
require.Equal(t, protocol.ByteCount(200), fc.SendWindowSize())
fc.AddBytesSent(200)
require.Zero(t, fc.SendWindowSize())
require.False(t, fc.IsNewlyBlocked()) // we're blocked, but not on stream flow control
}
func TestStreamWindowUpdate(t *testing.T) {
fc := NewStreamFlowController(
42,
NewConnectionFlowController(
protocol.MaxByteCount,
protocol.MaxByteCount,
nil,
&utils.RTTStats{},
utils.DefaultLogger,
),
100,
100,
protocol.MaxByteCount,
&utils.RTTStats{},
utils.DefaultLogger,
)
require.Zero(t, fc.GetWindowUpdate(time.Now()))
hasStreamWindowUpdate, _ := fc.AddBytesRead(24)
require.False(t, hasStreamWindowUpdate)
require.Zero(t, fc.GetWindowUpdate(time.Now()))
// the window is updated when it's 25% filled
hasStreamWindowUpdate, _ = fc.AddBytesRead(1)
require.True(t, hasStreamWindowUpdate)
require.Equal(t, protocol.ByteCount(125), fc.GetWindowUpdate(time.Now()))
hasStreamWindowUpdate, _ = fc.AddBytesRead(24)
require.False(t, hasStreamWindowUpdate)
require.Zero(t, fc.GetWindowUpdate(time.Now()))
// the window is updated when it's 25% filled
hasStreamWindowUpdate, _ = fc.AddBytesRead(1)
require.True(t, hasStreamWindowUpdate)
require.Equal(t, protocol.ByteCount(150), fc.GetWindowUpdate(time.Now()))
// Receive the final offset.
// We don't need to send any more flow control updates.
require.NoError(t, fc.UpdateHighestReceived(100, true, time.Now()))
fc.AddBytesRead(50)
require.Zero(t, fc.GetWindowUpdate(time.Now()))
}
func TestStreamConnectionWindowUpdate(t *testing.T) {
connFC := NewConnectionFlowController(
100,
protocol.MaxByteCount,
nil,
&utils.RTTStats{},
utils.DefaultLogger,
)
fc := NewStreamFlowController(
42,
connFC,
1000,
protocol.MaxByteCount,
protocol.MaxByteCount,
&utils.RTTStats{},
utils.DefaultLogger,
)
hasStreamWindowUpdate, hasConnWindowUpdate := fc.AddBytesRead(50)
require.False(t, hasStreamWindowUpdate)
require.Zero(t, fc.GetWindowUpdate(time.Now()))
require.True(t, hasConnWindowUpdate)
require.NotZero(t, connFC.GetWindowUpdate(time.Now()))
}
func TestStreamWindowAutoTuning(t *testing.T) {
// the RTT is 1 second
rttStats := &utils.RTTStats{}
rttStats.UpdateRTT(time.Second, 0)
require.Equal(t, time.Second, rttStats.SmoothedRTT())
connFC := NewConnectionFlowController(
150, // initial receive window
350, // max receive window
func(size protocol.ByteCount) bool { return true },
rttStats,
utils.DefaultLogger,
)
fc := NewStreamFlowController(
42,
connFC,
100, // initial send window
399, // max send window
protocol.MaxByteCount,
rttStats,
utils.DefaultLogger,
)
now := time.Now()
require.NoError(t, fc.UpdateHighestReceived(100, false, now))
// data consumption is too slow, window size is not increased
now = now.Add(2500 * time.Millisecond)
fc.AddBytesRead(51)
// one initial stream window size added
require.Equal(t, protocol.ByteCount(51+100), fc.GetWindowUpdate(now))
// one initial connection window size added
require.Equal(t, protocol.ByteCount(51+150), connFC.getWindowUpdate(now))
// data consumption is fast enough, window size is increased
now = now.Add(2 * time.Second)
fc.AddBytesRead(51)
// stream window size doubled to 200 bytes
require.Equal(t, protocol.ByteCount(102+2*100), fc.GetWindowUpdate(now))
// The connection window is now increased as well,
// so that we don't get blocked on connection level flow control:
// The increase is by 200 bytes * a connection factor of 1.5: 300 bytes.
require.Equal(t, protocol.ByteCount(102+300), connFC.GetWindowUpdate(now))
// data consumption is fast enough, window size is increased
now = now.Add(2 * time.Second)
fc.AddBytesRead(101)
// stream window size increased again, but bumps into its maximum value
require.Equal(t, protocol.ByteCount(203+399), fc.GetWindowUpdate(now))
// the connection window is also increased, but it bumps into its maximum value
require.Equal(t, protocol.ByteCount(203+350), connFC.GetWindowUpdate(now))
}
|