File: stream_flow_controller.go

package info (click to toggle)
golang-github-lucas-clemente-quic-go 0.54.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,312 kB
  • sloc: sh: 54; makefile: 7
file content (154 lines) | stat: -rw-r--r-- 4,830 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package flowcontrol

import (
	"fmt"
	"time"

	"github.com/quic-go/quic-go/internal/protocol"
	"github.com/quic-go/quic-go/internal/qerr"
	"github.com/quic-go/quic-go/internal/utils"
)

type streamFlowController struct {
	baseFlowController

	streamID protocol.StreamID

	connection connectionFlowControllerI

	receivedFinalOffset bool
}

var _ StreamFlowController = &streamFlowController{}

// NewStreamFlowController gets a new flow controller for a stream
func NewStreamFlowController(
	streamID protocol.StreamID,
	cfc ConnectionFlowController,
	receiveWindow protocol.ByteCount,
	maxReceiveWindow protocol.ByteCount,
	initialSendWindow protocol.ByteCount,
	rttStats *utils.RTTStats,
	logger utils.Logger,
) StreamFlowController {
	return &streamFlowController{
		streamID:   streamID,
		connection: cfc.(connectionFlowControllerI),
		baseFlowController: baseFlowController{
			rttStats:             rttStats,
			receiveWindow:        receiveWindow,
			receiveWindowSize:    receiveWindow,
			maxReceiveWindowSize: maxReceiveWindow,
			sendWindow:           initialSendWindow,
			logger:               logger,
		},
	}
}

// UpdateHighestReceived updates the highestReceived value, if the offset is higher.
func (c *streamFlowController) UpdateHighestReceived(offset protocol.ByteCount, final bool, now time.Time) error {
	// If the final offset for this stream is already known, check for consistency.
	if c.receivedFinalOffset {
		// If we receive another final offset, check that it's the same.
		if final && offset != c.highestReceived {
			return &qerr.TransportError{
				ErrorCode:    qerr.FinalSizeError,
				ErrorMessage: fmt.Sprintf("received inconsistent final offset for stream %d (old: %d, new: %d bytes)", c.streamID, c.highestReceived, offset),
			}
		}
		// Check that the offset is below the final offset.
		if offset > c.highestReceived {
			return &qerr.TransportError{
				ErrorCode:    qerr.FinalSizeError,
				ErrorMessage: fmt.Sprintf("received offset %d for stream %d, but final offset was already received at %d", offset, c.streamID, c.highestReceived),
			}
		}
	}

	if final {
		c.receivedFinalOffset = true
	}
	if offset == c.highestReceived {
		return nil
	}
	// A higher offset was received before. This can happen due to reordering.
	if offset < c.highestReceived {
		if final {
			return &qerr.TransportError{
				ErrorCode:    qerr.FinalSizeError,
				ErrorMessage: fmt.Sprintf("received final offset %d for stream %d, but already received offset %d before", offset, c.streamID, c.highestReceived),
			}
		}
		return nil
	}

	// If this is the first frame received for this stream, start flow-control auto-tuning.
	if c.highestReceived == 0 {
		c.startNewAutoTuningEpoch(now)
	}
	increment := offset - c.highestReceived
	c.highestReceived = offset

	if c.checkFlowControlViolation() {
		return &qerr.TransportError{
			ErrorCode:    qerr.FlowControlError,
			ErrorMessage: fmt.Sprintf("received %d bytes on stream %d, allowed %d bytes", offset, c.streamID, c.receiveWindow),
		}
	}
	return c.connection.IncrementHighestReceived(increment, now)
}

func (c *streamFlowController) AddBytesRead(n protocol.ByteCount) (hasStreamWindowUpdate, hasConnWindowUpdate bool) {
	c.mutex.Lock()
	c.addBytesRead(n)
	hasStreamWindowUpdate = c.shouldQueueWindowUpdate()
	c.mutex.Unlock()
	hasConnWindowUpdate = c.connection.AddBytesRead(n)
	return
}

func (c *streamFlowController) Abandon() {
	c.mutex.Lock()
	unread := c.highestReceived - c.bytesRead
	c.bytesRead = c.highestReceived
	c.mutex.Unlock()
	if unread > 0 {
		c.connection.AddBytesRead(unread)
	}
}

func (c *streamFlowController) AddBytesSent(n protocol.ByteCount) {
	c.baseFlowController.AddBytesSent(n)
	c.connection.AddBytesSent(n)
}

func (c *streamFlowController) SendWindowSize() protocol.ByteCount {
	return min(c.baseFlowController.SendWindowSize(), c.connection.SendWindowSize())
}

func (c *streamFlowController) IsNewlyBlocked() bool {
	blocked, _ := c.baseFlowController.IsNewlyBlocked()
	return blocked
}

func (c *streamFlowController) shouldQueueWindowUpdate() bool {
	return !c.receivedFinalOffset && c.hasWindowUpdate()
}

func (c *streamFlowController) GetWindowUpdate(now time.Time) protocol.ByteCount {
	// If we already received the final offset for this stream, the peer won't need any additional flow control credit.
	if c.receivedFinalOffset {
		return 0
	}

	c.mutex.Lock()
	defer c.mutex.Unlock()

	oldWindowSize := c.receiveWindowSize
	offset := c.getWindowUpdate(now)
	if c.receiveWindowSize > oldWindowSize { // auto-tuning enlarged the window size
		c.logger.Debugf("Increasing receive flow control window for stream %d to %d", c.streamID, c.receiveWindowSize)
		c.connection.EnsureMinimumWindowSize(protocol.ByteCount(float64(c.receiveWindowSize)*protocol.ConnectionFlowControlMultiplier), now)
	}
	return offset
}