1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295
|
package quic
import (
"slices"
"sync"
"time"
"github.com/quic-go/quic-go/internal/ackhandler"
"github.com/quic-go/quic-go/internal/flowcontrol"
"github.com/quic-go/quic-go/internal/protocol"
"github.com/quic-go/quic-go/internal/utils/ringbuffer"
"github.com/quic-go/quic-go/internal/wire"
"github.com/quic-go/quic-go/quicvarint"
)
const (
maxPathResponses = 256
maxControlFrames = 16 << 10
)
// This is the largest possible size of a stream-related control frame
// (which is the RESET_STREAM frame).
const maxStreamControlFrameSize = 25
type streamFrameGetter interface {
popStreamFrame(protocol.ByteCount, protocol.Version) (ackhandler.StreamFrame, *wire.StreamDataBlockedFrame, bool)
}
type streamControlFrameGetter interface {
getControlFrame(time.Time) (_ ackhandler.Frame, ok, hasMore bool)
}
type framer struct {
mutex sync.Mutex
activeStreams map[protocol.StreamID]streamFrameGetter
streamQueue ringbuffer.RingBuffer[protocol.StreamID]
streamsWithControlFrames map[protocol.StreamID]streamControlFrameGetter
controlFrameMutex sync.Mutex
controlFrames []wire.Frame
pathResponses []*wire.PathResponseFrame
connFlowController flowcontrol.ConnectionFlowController
queuedTooManyControlFrames bool
}
func newFramer(connFlowController flowcontrol.ConnectionFlowController) *framer {
return &framer{
activeStreams: make(map[protocol.StreamID]streamFrameGetter),
streamsWithControlFrames: make(map[protocol.StreamID]streamControlFrameGetter),
connFlowController: connFlowController,
}
}
func (f *framer) HasData() bool {
f.mutex.Lock()
hasData := !f.streamQueue.Empty()
f.mutex.Unlock()
if hasData {
return true
}
f.controlFrameMutex.Lock()
defer f.controlFrameMutex.Unlock()
return len(f.streamsWithControlFrames) > 0 || len(f.controlFrames) > 0 || len(f.pathResponses) > 0
}
func (f *framer) QueueControlFrame(frame wire.Frame) {
f.controlFrameMutex.Lock()
defer f.controlFrameMutex.Unlock()
if pr, ok := frame.(*wire.PathResponseFrame); ok {
// Only queue up to maxPathResponses PATH_RESPONSE frames.
// This limit should be high enough to never be hit in practice,
// unless the peer is doing something malicious.
if len(f.pathResponses) >= maxPathResponses {
return
}
f.pathResponses = append(f.pathResponses, pr)
return
}
// This is a hack.
if len(f.controlFrames) >= maxControlFrames {
f.queuedTooManyControlFrames = true
return
}
f.controlFrames = append(f.controlFrames, frame)
}
func (f *framer) Append(
frames []ackhandler.Frame,
streamFrames []ackhandler.StreamFrame,
maxLen protocol.ByteCount,
now time.Time,
v protocol.Version,
) ([]ackhandler.Frame, []ackhandler.StreamFrame, protocol.ByteCount) {
f.controlFrameMutex.Lock()
frames, controlFrameLen := f.appendControlFrames(frames, maxLen, now, v)
maxLen -= controlFrameLen
var lastFrame ackhandler.StreamFrame
var streamFrameLen protocol.ByteCount
f.mutex.Lock()
// pop STREAM frames, until less than 128 bytes are left in the packet
numActiveStreams := f.streamQueue.Len()
for i := 0; i < numActiveStreams; i++ {
if protocol.MinStreamFrameSize > maxLen {
break
}
sf, blocked := f.getNextStreamFrame(maxLen, v)
if sf.Frame != nil {
streamFrames = append(streamFrames, sf)
maxLen -= sf.Frame.Length(v)
lastFrame = sf
streamFrameLen += sf.Frame.Length(v)
}
// If the stream just became blocked on stream flow control, attempt to pack the
// STREAM_DATA_BLOCKED into the same packet.
if blocked != nil {
l := blocked.Length(v)
// In case it doesn't fit, queue it for the next packet.
if maxLen < l {
f.controlFrames = append(f.controlFrames, blocked)
break
}
frames = append(frames, ackhandler.Frame{Frame: blocked})
maxLen -= l
controlFrameLen += l
}
}
// The only way to become blocked on connection-level flow control is by sending STREAM frames.
if isBlocked, offset := f.connFlowController.IsNewlyBlocked(); isBlocked {
blocked := &wire.DataBlockedFrame{MaximumData: offset}
l := blocked.Length(v)
// In case it doesn't fit, queue it for the next packet.
if maxLen >= l {
frames = append(frames, ackhandler.Frame{Frame: blocked})
controlFrameLen += l
} else {
f.controlFrames = append(f.controlFrames, blocked)
}
}
f.mutex.Unlock()
f.controlFrameMutex.Unlock()
if lastFrame.Frame != nil {
// account for the smaller size of the last STREAM frame
streamFrameLen -= lastFrame.Frame.Length(v)
lastFrame.Frame.DataLenPresent = false
streamFrameLen += lastFrame.Frame.Length(v)
}
return frames, streamFrames, controlFrameLen + streamFrameLen
}
func (f *framer) appendControlFrames(
frames []ackhandler.Frame,
maxLen protocol.ByteCount,
now time.Time,
v protocol.Version,
) ([]ackhandler.Frame, protocol.ByteCount) {
var length protocol.ByteCount
// add a PATH_RESPONSE first, but only pack a single PATH_RESPONSE per packet
if len(f.pathResponses) > 0 {
frame := f.pathResponses[0]
frameLen := frame.Length(v)
if frameLen <= maxLen {
frames = append(frames, ackhandler.Frame{Frame: frame})
length += frameLen
f.pathResponses = f.pathResponses[1:]
}
}
// add stream-related control frames
for id, str := range f.streamsWithControlFrames {
start:
remainingLen := maxLen - length
if remainingLen <= maxStreamControlFrameSize {
break
}
fr, ok, hasMore := str.getControlFrame(now)
if !hasMore {
delete(f.streamsWithControlFrames, id)
}
if !ok {
continue
}
frames = append(frames, fr)
length += fr.Frame.Length(v)
if hasMore {
// It is rare that a stream has more than one control frame to queue.
// We don't want to spawn another loop for just to cover that case.
goto start
}
}
for len(f.controlFrames) > 0 {
frame := f.controlFrames[len(f.controlFrames)-1]
frameLen := frame.Length(v)
if length+frameLen > maxLen {
break
}
frames = append(frames, ackhandler.Frame{Frame: frame})
length += frameLen
f.controlFrames = f.controlFrames[:len(f.controlFrames)-1]
}
return frames, length
}
// QueuedTooManyControlFrames says if the control frame queue exceeded its maximum queue length.
// This is a hack.
// It is easier to implement than propagating an error return value in QueueControlFrame.
// The correct solution would be to queue frames with their respective structs.
// See https://github.com/quic-go/quic-go/issues/4271 for the queueing of stream-related control frames.
func (f *framer) QueuedTooManyControlFrames() bool {
return f.queuedTooManyControlFrames
}
func (f *framer) AddActiveStream(id protocol.StreamID, str streamFrameGetter) {
f.mutex.Lock()
if _, ok := f.activeStreams[id]; !ok {
f.streamQueue.PushBack(id)
f.activeStreams[id] = str
}
f.mutex.Unlock()
}
func (f *framer) AddStreamWithControlFrames(id protocol.StreamID, str streamControlFrameGetter) {
f.controlFrameMutex.Lock()
if _, ok := f.streamsWithControlFrames[id]; !ok {
f.streamsWithControlFrames[id] = str
}
f.controlFrameMutex.Unlock()
}
// RemoveActiveStream is called when a stream completes.
func (f *framer) RemoveActiveStream(id protocol.StreamID) {
f.mutex.Lock()
delete(f.activeStreams, id)
// We don't delete the stream from the streamQueue,
// since we'd have to iterate over the ringbuffer.
// Instead, we check if the stream is still in activeStreams when appending STREAM frames.
f.mutex.Unlock()
}
func (f *framer) getNextStreamFrame(maxLen protocol.ByteCount, v protocol.Version) (ackhandler.StreamFrame, *wire.StreamDataBlockedFrame) {
id := f.streamQueue.PopFront()
// This should never return an error. Better check it anyway.
// The stream will only be in the streamQueue, if it enqueued itself there.
str, ok := f.activeStreams[id]
// The stream might have been removed after being enqueued.
if !ok {
return ackhandler.StreamFrame{}, nil
}
// For the last STREAM frame, we'll remove the DataLen field later.
// Therefore, we can pretend to have more bytes available when popping
// the STREAM frame (which will always have the DataLen set).
maxLen += protocol.ByteCount(quicvarint.Len(uint64(maxLen)))
frame, blocked, hasMoreData := str.popStreamFrame(maxLen, v)
if hasMoreData { // put the stream back in the queue (at the end)
f.streamQueue.PushBack(id)
} else { // no more data to send. Stream is not active
delete(f.activeStreams, id)
}
// Note that the frame.Frame can be nil:
// * if the stream was canceled after it said it had data
// * the remaining size doesn't allow us to add another STREAM frame
return frame, blocked
}
func (f *framer) Handle0RTTRejection() {
f.mutex.Lock()
defer f.mutex.Unlock()
f.controlFrameMutex.Lock()
defer f.controlFrameMutex.Unlock()
f.streamQueue.Clear()
for id := range f.activeStreams {
delete(f.activeStreams, id)
}
var j int
for i, frame := range f.controlFrames {
switch frame.(type) {
case *wire.MaxDataFrame, *wire.MaxStreamDataFrame, *wire.MaxStreamsFrame,
*wire.DataBlockedFrame, *wire.StreamDataBlockedFrame, *wire.StreamsBlockedFrame:
continue
default:
f.controlFrames[j] = f.controlFrames[i]
j++
}
}
f.controlFrames = slices.Delete(f.controlFrames, j, len(f.controlFrames))
}
|