File: Flow.hs

package info (click to toggle)
haskell-network-control 0.1.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 88 kB
  • sloc: haskell: 355; makefile: 2
file content (202 lines) | stat: -rw-r--r-- 6,358 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
{-# LANGUAGE RecordWildCards #-}

module Network.Control.Flow (
    -- * Flow control

    -- | This is based on the total approach of QUIC rather than
    --   the difference approach of HTTP\/2 because QUIC'one is
    --   considered safer. Please refer to [Using HTTP\/3 Stream Limits in HTTP\/2](https://datatracker.ietf.org/doc/draft-thomson-httpbis-h2-stream-limits/) to understand that QUIC's approaches are better though its topic is about stream concurrency.

    -- ** Constants for flow control.
    defaultMaxStreams,
    defaultMaxStreamData,
    defaultMaxData,

    -- ** Flow control for sending
    TxFlow (..),
    newTxFlow,
    txWindowSize,
    WindowSize,

    -- ** Flow control for receiving
    RxFlow (..),
    newRxFlow,
    rxWindowSize,
    FlowControlType (..),
    maybeOpenRxWindow,
    checkRxLimit,
) where

import Data.Bits

-- | Default max streams. (64)
defaultMaxStreams :: Int
defaultMaxStreams = 64

-- | Default max data of a stream. (256K bytes)
defaultMaxStreamData :: Int
defaultMaxStreamData = 262144

-- | Default max data of a connection.
--
-- By default, this is set to @defaultMaxStreams * defaultMaxStreamData@. This
-- ensures that streams that are not currently handled cannot exhaust the
-- connection window.
--
-- If you use a smaller connection window size, you __must__ ensure that if you
-- are handling fewer concurrent streams than allowed by 'defaultMaxStreams',
-- that the unhandled streams cannot exhaust the connection window, or risk the
-- entire system deadlocking.
defaultMaxData :: Int
defaultMaxData = defaultMaxStreamData * defaultMaxStreams

-- | Window size.
type WindowSize = Int

-- | Flow for sending
--
-- @
-- -------------------------------------->
--        ^           ^
--     txfSent    txfLimit
--
--        |-----------| The size which this node can send
--        txWindowSize
-- @
data TxFlow = TxFlow
    { txfSent :: Int
    -- ^ The total size of sent data.
    , txfLimit :: Int
    -- ^ The total size of data which can be sent.
    }
    deriving (Eq, Show)

-- | Creating TX flow with a receive buffer size.
newTxFlow :: WindowSize -> TxFlow
newTxFlow win = TxFlow 0 win

-- | 'txfLimit' - 'txfSent'.
txWindowSize :: TxFlow -> WindowSize
txWindowSize TxFlow{..} = txfLimit - txfSent

-- | Flow for receiving.
--
-- The goal of 'RxFlow' is to ensure that our network peer does not send us data
-- faster than we can consume it. We therefore impose a maximum number of
-- unconsumed bytes that we are willing to receive from the peer, which we refer
-- to as the buffer size:
--
-- @
--                    rxfBufSize
--           |---------------------------|
-- -------------------------------------------->
--           ^              ^
--      rxfConsumed    rxvReceived
-- @
--
-- The peer does not know of course how many bytes we have consumed of the data
-- that they sent us, so they keep track of their own limit of how much data
-- they are allowed to send. We keep track of this limit also:
--
-- @
--                    rxfBufSize
--           |---------------------------|
-- -------------------------------------------->
--           ^              ^       ^
--      rxfConsumed    rxvReceived  |
--                               rxfLimit
-- @
--
-- Each time we receive data from the peer, we check that they do not exceed the
-- limit ('checkRxLimit'). When we consume data, we periodically send the peer
-- an update (known as a _window update_) of what their new limit is
-- ('maybeOpenRxWindow'). To decrease overhead, we only this if the window
-- update is at least half the window size.
data RxFlow = RxFlow
    { rxfBufSize :: Int
    -- ^ Maxinum number of unconsumed bytes the peer can send us
    --
    -- See discussion above for details.
    , rxfConsumed :: Int
    -- ^ How much of the data that the peer has sent us have we consumed?
    --
    -- This is an absolute number: the total about of bytes consumed over the
    -- lifetime of the connection or stream (i.e., not relative to the window).
    , rxfReceived :: Int
    -- ^ How much data have we received from the peer?
    --
    -- Like 'rxfConsumed', this is an absolute number.
    , rxfLimit :: Int
    -- ^ Current limit on how many bytes the peer is allowed to send us.
    --
    -- Like 'rxfConsumed, this is an absolute number.
    }
    deriving (Eq, Show)

-- | Creating RX flow with an initial window size.
newRxFlow :: WindowSize -> RxFlow
newRxFlow win = RxFlow win 0 0 win

-- | 'rxfLimit' - 'rxfReceived'.
--
-- This is the number of bytes the peer is still allowed to send before they
-- must wait for a window update; see 'RxFlow' for details.
rxWindowSize :: RxFlow -> WindowSize
rxWindowSize RxFlow{..} = rxfLimit - rxfReceived

-- | The representation of window size update.
data FlowControlType
    = -- | HTTP\/2 style
      FCTWindowUpdate
    | -- | QUIC style
      FCTMaxData

-- | Record that we have consumed some received data
--
-- May return a window update; see 'RxFlow' for details.
maybeOpenRxWindow
    :: Int
    -- ^ The consumed size.
    -> FlowControlType
    -> RxFlow
    -> (RxFlow, Maybe Int)
    -- ^ 'Just' if the size should be informed to the peer.
maybeOpenRxWindow consumed fct flow@RxFlow{..}
    | winUpdate >= threshold =
        let flow' =
                flow
                    { rxfConsumed = rxfConsumed'
                    , rxfLimit = rxfLimit'
                    }
            update = case fct of
                FCTWindowUpdate -> winUpdate
                FCTMaxData -> rxfLimit'
         in (flow', Just update)
    | otherwise =
        let flow' = flow{rxfConsumed = rxfConsumed'}
         in (flow', Nothing)
  where
    rxfConsumed' = rxfConsumed + consumed

    -- Minimum window update size
    threshold = rxfBufSize `unsafeShiftR` 1

    -- The window update, /if/ we choose to send it
    rxfLimit' = rxfConsumed' + rxfBufSize
    winUpdate = rxfLimit' - rxfLimit

-- | Checking if received data is acceptable against the
--   current window.
checkRxLimit
    :: Int
    -- ^ The size of received data.
    -> RxFlow
    -> (RxFlow, Bool)
    -- ^ Acceptable if 'True'.
checkRxLimit received flow@RxFlow{..}
    | received' <= rxfLimit =
        let flow' = flow{rxfReceived = received'}
         in (flow', True)
    | otherwise = (flow, False)
  where
    received' = rxfReceived + received