1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"sync/atomic"
"time"
)
// connInflow tracks connection-level flow control for data sent by the peer to us.
//
// There are four byte offsets of significance in the stream of data received from the peer,
// each >= to the previous:
//
// - bytes read by the user
// - bytes received from the peer
// - limit sent to the peer in a MAX_DATA frame
// - potential new limit to sent to the peer
//
// We maintain a flow control window, so as bytes are read by the user
// the potential limit is extended correspondingly.
//
// We keep an atomic counter of bytes read by the user and not yet applied to the
// potential limit (credit). When this count grows large enough, we update the
// new limit to send and mark that we need to send a new MAX_DATA frame.
type connInflow struct {
sent sentVal // set when we need to send a MAX_DATA update to the peer
usedLimit int64 // total bytes sent by the peer, must be less than sentLimit
sentLimit int64 // last MAX_DATA sent to the peer
newLimit int64 // new MAX_DATA to send
credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
}
func (c *Conn) inflowInit() {
// The initial MAX_DATA limit is sent as a transport parameter.
c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
c.streams.inflow.newLimit = c.streams.inflow.sentLimit
}
// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
// We may extend the peer's flow control window.
//
// This is called indirectly by the user, via Read or CloseRead.
func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
if n == 0 {
return
}
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
// We should send a MAX_DATA update to the peer.
// Record this on the Conn's main loop.
c.sendMsg(func(now time.Time, c *Conn) {
// A MAX_DATA update may have already happened, so check again.
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) {
c.sendMaxDataUpdate()
}
})
}
}
// handleStreamBytesReadOnLoop extends the peer's flow control window after
// data has been discarded due to a RESET_STREAM frame.
//
// This is called on the conn's loop.
func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
c.sendMaxDataUpdate()
}
}
func (c *Conn) sendMaxDataUpdate() {
c.streams.inflow.sent.setUnsent()
// Apply current credit to the limit.
// We don't strictly need to do this here
// since appendMaxDataFrame will do so as well,
// but this avoids redundant trips down this path
// if the MAX_DATA frame doesn't go out right away.
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
}
func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
}
// handleStreamBytesReceived records that the peer has sent us stream data.
func (c *Conn) handleStreamBytesReceived(n int64) error {
c.streams.inflow.usedLimit += n
if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
return localTransportError{
code: errFlowControl,
reason: "stream exceeded flow control limit",
}
}
return nil
}
// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
//
// It returns true if no more frames need appending,
// false if it could not fit a frame in the current packet.
func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
if c.streams.inflow.sent.shouldSendPTO(pto) {
// Add any unapplied credit to the new limit now.
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
return false
}
c.streams.inflow.sentLimit += c.streams.inflow.newLimit
c.streams.inflow.sent.setSent(pnum)
}
return true
}
// ackOrLossMaxData records the fate of a MAX_DATA frame.
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
}
// connOutflow tracks connection-level flow control for data sent by us to the peer.
type connOutflow struct {
max int64 // largest MAX_DATA received from peer
used int64 // total bytes of STREAM data sent to peer
}
// setMaxData updates the connection-level flow control limit
// with the initial limit conveyed in transport parameters
// or an update from a MAX_DATA frame.
func (f *connOutflow) setMaxData(maxData int64) {
f.max = max(f.max, maxData)
}
// avail returns the number of connection-level flow control bytes available.
func (f *connOutflow) avail() int64 {
return f.max - f.used
}
// consume records consumption of n bytes of flow.
func (f *connOutflow) consume(n int64) {
f.used += n
}
|