1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383
|
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.HTTP2.H2.Sender (
frameSender,
) where
import Control.Concurrent.STM
import qualified Control.Exception as E
import Data.IORef (modifyIORef', readIORef, writeIORef)
import Data.IntMap.Strict (IntMap)
import Foreign.Ptr (minusPtr, plusPtr)
import Network.ByteOrder
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.IO
import Imports
import Network.HPACK (setLimitForEncoding, toTokenHeaderTable)
import Network.HTTP2.Frame
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.EncodeFrame
import Network.HTTP2.H2.HPACK
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Settings
import Network.HTTP2.H2.Stream
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window
----------------------------------------------------------------
data Switch
= C Control
| O Output
| Flush
-- Peer SETTINGS_INITIAL_WINDOW_SIZE
-- Adjusting initial window size for streams
updatePeerSettings :: Context -> SettingsList -> IO ()
updatePeerSettings Context{peerSettings, oddStreamTable, evenStreamTable} peerAlist = do
oldws <- initialWindowSize <$> readIORef peerSettings
modifyIORef' peerSettings $ \old -> fromSettingsList old peerAlist
newws <- initialWindowSize <$> readIORef peerSettings
-- FIXME: race condition
-- 1) newOddStream reads old peerSettings and
-- insert it to its stream table after adjusting.
-- 2) newOddStream reads new peerSettings and
-- insert it to its stream table before adjusting.
let dif = newws - oldws
when (dif /= 0) $ do
getOddStreams oddStreamTable >>= updateAllStreamTxFlow dif
getEvenStreams evenStreamTable >>= updateAllStreamTxFlow dif
where
updateAllStreamTxFlow :: WindowSize -> IntMap Stream -> IO ()
updateAllStreamTxFlow siz strms =
forM_ strms $ \strm -> increaseStreamWindowSize strm siz
frameSender :: Context -> Config -> IO ()
frameSender
ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit}
Config{..} = do
labelMe "H2 sender"
loop 0
where
----------------------------------------------------------------
loop :: Offset -> IO ()
loop off = do
x <- atomically $ dequeue off
case x of
C ctl -> flushN off >> control ctl >> loop 0
O out -> outputAndSync out off >>= flushIfNecessary >>= loop
Flush -> flushN off >> loop 0
-- Flush the connection buffer to the socket, where the first 'n' bytes of
-- the buffer are filled.
flushN :: Offset -> IO ()
flushN 0 = return ()
flushN n = bufferIO confWriteBuffer n confSendAll
flushIfNecessary :: Offset -> IO Offset
flushIfNecessary off = do
buflim <- readIORef outputBufferLimit
if off <= buflim - 512
then return off
else do
flushN off
return 0
dequeue :: Offset -> STM Switch
dequeue off = do
isEmptyC <- isEmptyTQueue controlQ
if isEmptyC
then do
-- FLOW CONTROL: WINDOW_UPDATE 0: send: respecting peer's limit
waitConnectionWindowSize ctx
isEmptyO <- isEmptyTQueue outputQ
if isEmptyO
then if off /= 0 then return Flush else retry
else O <$> readTQueue outputQ
else C <$> readTQueue controlQ
----------------------------------------------------------------
copyAll [] buf = return buf
copyAll (x : xs) buf = copy buf x >>= copyAll xs
-- called with off == 0
control :: Control -> IO ()
control (CFrames ms xs) = do
buf <- copyAll xs confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
flushN off
case ms of
Nothing -> return ()
Just peerAlist -> do
-- Peer SETTINGS_INITIAL_WINDOW_SIZE
updatePeerSettings ctx peerAlist
-- Peer SETTINGS_MAX_FRAME_SIZE
case lookup SettingsMaxFrameSize peerAlist of
Nothing -> return ()
Just payloadLen -> do
let dlim = payloadLen + frameHeaderLength
buflim
| confBufferSize >= dlim = dlim
| otherwise = confBufferSize
writeIORef outputBufferLimit buflim
-- Peer SETTINGS_HEADER_TABLE_SIZE
case lookup SettingsTokenHeaderTableSize peerAlist of
Nothing -> return ()
Just siz -> setLimitForEncoding siz encodeDynamicTable
----------------------------------------------------------------
-- INVARIANT
--
-- Both the stream window and the connection window are open.
----------------------------------------------------------------
outputAndSync :: Output -> Offset -> IO Offset
outputAndSync out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
state <- readStreamState strm
if isHalfClosedLocal state
then return off
else case otyp of
OHeader hdr mnext tlrmkr -> do
(off', mout') <- outputHeader strm hdr mnext tlrmkr sync off
sync mout'
return off'
_ -> do
sws <- getStreamWindowSize strm
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
(off', mout') <- output out off lim
sync mout'
return off'
resetStream :: Stream -> ErrorCode -> E.SomeException -> IO ()
resetStream strm err e
| isAsyncException e = E.throwIO e
| otherwise = do
closed ctx strm (ResetByMe e)
let rst = resetFrame err $ streamNumber strm
enqueueControl controlQ $ CFrames Nothing [rst]
----------------------------------------------------------------
outputHeader
:: Stream
-> [Header]
-> Maybe DynaNext
-> TrailersMaker
-> (Maybe Output -> IO ())
-> Offset
-> IO (Offset, Maybe Output)
outputHeader strm hdr mnext tlrmkr sync off0 = do
-- Header frame and Continuation frame
let sid = streamNumber strm
endOfStream = isNothing mnext
(ths, _) <- toTokenHeaderTable $ fixHeaders hdr
off' <- headerContinue sid ths endOfStream off0
-- halfClosedLocal calls closed which removes
-- the stream from stream table.
off <- flushIfNecessary off'
case mnext of
Nothing -> do
-- endOfStream
halfClosedLocal ctx strm Finished
return (off, Nothing)
Just next -> do
let out' = Output strm (ONext next tlrmkr) sync
return (off, Just out')
----------------------------------------------------------------
output :: Output -> Offset -> WindowSize -> IO (Offset, Maybe Output)
output out@(Output strm (ONext curr tlrmkr) _) off0 lim = do
-- Data frame payload
buflim <- readIORef outputBufferLimit
let payloadOff = off0 + frameHeaderLength
datBuf = confWriteBuffer `plusPtr` payloadOff
datBufSiz = buflim - payloadOff
curr datBuf (min datBufSiz lim) >>= \next ->
case next of
Next datPayloadLen reqflush mnext -> do
NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
fillDataHeader
strm
off0
datPayloadLen
mnext
tlrmkr'
out
reqflush
CancelNext mErr -> do
-- Stream cancelled
--
-- At this point, the headers have already been sent.
-- Therefore, the stream cannot be in the 'Idle' state, so we
-- are justified in sending @RST_STREAM@.
--
-- By the invariant on the 'outputQ', there are no other
-- outputs for this stream already enqueued. Therefore, we can
-- safely cancel it knowing that we won't try and send any
-- more data frames on this stream.
case mErr of
Just err ->
resetStream strm InternalError err
Nothing ->
resetStream strm Cancel (E.toException CancelledStream)
return (off0, Nothing)
output (Output strm (OPush ths pid) _) off0 _lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
return (off, Nothing)
output _ _ _ = undefined -- never reached
----------------------------------------------------------------
headerContinue :: StreamId -> TokenHeaderList -> Bool -> Offset -> IO Offset
headerContinue sid ths0 endOfStream off0 = do
buflim <- readIORef outputBufferLimit
let offkv = off0 + frameHeaderLength
bufkv = confWriteBuffer `plusPtr` offkv
limkv = buflim - offkv
(ths, kvlen) <- hpackEncodeHeader ctx bufkv limkv ths0
if kvlen == 0
then continue off0 ths FrameHeaders
else do
let flag = getFlag ths
buf = confWriteBuffer `plusPtr` off0
off = offkv + kvlen
fillFrameHeader FrameHeaders kvlen sid flag buf
continue off ths FrameContinuation
where
eos = if endOfStream then setEndStream else id
getFlag [] = eos $ setEndHeader defaultFlags
getFlag _ = eos defaultFlags
continue :: Offset -> TokenHeaderList -> FrameType -> IO Offset
continue off [] _ = return off
continue off ths ft = do
flushN off
-- Now off is 0
buflim <- readIORef outputBufferLimit
let bufHeaderPayload = confWriteBuffer `plusPtr` frameHeaderLength
headerPayloadLim = buflim - frameHeaderLength
(ths', kvlen') <-
hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths
when (ths == ths') $
E.throwIO $
ConnectionErrorIsSent CompressionError sid "cannot compress the header"
let flag = getFlag ths'
off' = frameHeaderLength + kvlen'
fillFrameHeader ft kvlen' sid flag confWriteBuffer
continue off' ths' FrameContinuation
----------------------------------------------------------------
fillDataHeader
:: Stream
-> Offset
-> Int
-> Maybe DynaNext
-> (Maybe ByteString -> IO NextTrailersMaker)
-> Output
-> Bool
-> IO (Offset, Maybe Output)
fillDataHeader
strm@Stream{streamNumber}
off
datPayloadLen
Nothing
tlrmkr
_
reqflush = do
let buf = confWriteBuffer `plusPtr` off
(mtrailers, flag) <- do
Trailers trailers <- tlrmkr Nothing
if null trailers
then return (Nothing, setEndStream defaultFlags)
else return (Just trailers, defaultFlags)
-- Avoid sending an empty data frame before trailers at the end
-- of a stream
off' <-
if datPayloadLen /= 0 || isNothing mtrailers
then do
decreaseWindowSize ctx strm datPayloadLen
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
return $ off + frameHeaderLength + datPayloadLen
else
return off
off'' <- handleTrailers mtrailers off'
halfClosedLocal ctx strm Finished
if reqflush
then do
flushN off''
return (0, Nothing)
else return (off'', Nothing)
where
handleTrailers Nothing off0 = return off0
handleTrailers (Just trailers) off0 = do
(ths, _) <- toTokenHeaderTable trailers
headerContinue streamNumber ths True {- endOfStream -} off0
fillDataHeader
_
off
0
(Just next)
tlrmkr
out
reqflush = do
let out' = out{outputType = ONext next tlrmkr}
if reqflush
then do
flushN off
return (0, Just out')
else return (off, Just out')
fillDataHeader
strm@Stream{streamNumber}
off
datPayloadLen
(Just next)
tlrmkr
out
reqflush = do
let buf = confWriteBuffer `plusPtr` off
off' = off + frameHeaderLength + datPayloadLen
flag = defaultFlags
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
decreaseWindowSize ctx strm datPayloadLen
let out' = out{outputType = ONext next tlrmkr}
if reqflush
then do
flushN off'
return (0, Just out')
else return (off', Just out')
----------------------------------------------------------------
pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int
pushPromise pid sid ths off = do
let offsid = off + frameHeaderLength -- checkme
bufsid = confWriteBuffer `plusPtr` offsid
poke32 (fromIntegral sid) bufsid 0
let offkv = offsid + 4
bufkv = confWriteBuffer `plusPtr` offkv
limkv = confBufferSize - offkv
(_, kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
let flag = setEndHeader defaultFlags -- No EndStream flag
buf = confWriteBuffer `plusPtr` off
len = kvlen + 4
fillFrameHeader FramePushPromise len pid flag buf
return len
----------------------------------------------------------------
{-# INLINE fillFrameHeader #-}
fillFrameHeader :: FrameType -> Int -> StreamId -> FrameFlags -> Buffer -> IO ()
fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf
where
hinfo =
FrameHeader
{ payloadLength = len
, flags = flag
, streamId = sid
}
|