1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
|
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.Window where
import Data.IORef
import Network.Control
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.EncodeFrame
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
getStreamWindowSize :: Stream -> IO WindowSize
getStreamWindowSize Stream{streamTxFlow} =
txWindowSize <$> readTVarIO streamTxFlow
getConnectionWindowSize :: Context -> IO WindowSize
getConnectionWindowSize Context{txFlow} =
txWindowSize <$> readTVarIO txFlow
waitStreamWindowSize :: Stream -> IO ()
waitStreamWindowSize Stream{streamTxFlow} = atomically $ do
w <- txWindowSize <$> readTVar streamTxFlow
checkSTM (w > 0)
waitConnectionWindowSize :: Context -> STM ()
waitConnectionWindowSize Context{txFlow} = do
w <- txWindowSize <$> readTVar txFlow
checkSTM (w > 0)
----------------------------------------------------------------
-- Receiving window update
increaseWindowSize :: StreamId -> TVar TxFlow -> WindowSize -> IO ()
increaseWindowSize sid tvar n = do
atomically $ modifyTVar' tvar $ \flow -> flow{txfLimit = txfLimit flow + n}
w <- txWindowSize <$> readTVarIO tvar
when (isWindowOverflow w) $ do
let msg = fromString ("window update for stream " ++ show sid ++ " is overflow")
err =
if isControl sid
then ConnectionErrorIsSent
else StreamErrorIsSent
E.throwIO $ err FlowControlError sid msg
increaseStreamWindowSize :: Stream -> WindowSize -> IO ()
increaseStreamWindowSize Stream{streamNumber, streamTxFlow} n =
increaseWindowSize streamNumber streamTxFlow n
increaseConnectionWindowSize :: Context -> Int -> IO ()
increaseConnectionWindowSize Context{txFlow} n =
increaseWindowSize 0 txFlow n
decreaseWindowSize :: Context -> Stream -> WindowSize -> IO ()
decreaseWindowSize Context{txFlow} Stream{streamTxFlow} siz = do
dec txFlow
dec streamTxFlow
where
dec tvar = atomically $ modifyTVar' tvar $ \flow -> flow{txfSent = txfSent flow + siz}
----------------------------------------------------------------
-- Sending window update
informWindowUpdate :: Context -> Stream -> Int -> IO ()
informWindowUpdate _ _ 0 = return ()
informWindowUpdate Context{controlQ, rxFlow} Stream{streamNumber, streamRxFlow} len = do
mxc <- atomicModifyIORef rxFlow $ maybeOpenRxWindow len FCTWindowUpdate
forM_ mxc $ \ws -> do
let frame = windowUpdateFrame 0 ws
cframe = CFrames Nothing [frame]
enqueueControl controlQ cframe
mxs <- atomicModifyIORef streamRxFlow $ maybeOpenRxWindow len FCTWindowUpdate
forM_ mxs $ \ws -> do
let frame = windowUpdateFrame streamNumber ws
cframe = CFrames Nothing [frame]
enqueueControl controlQ cframe
|