1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
|
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.HTTP2.H2.Sender (
frameSender,
fillBuilderBodyGetNext,
fillFileBodyGetNext,
fillStreamBodyGetNext,
runTrailersMaker,
) where
import Control.Concurrent.MVar (putMVar)
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
import Data.IORef (modifyIORef', readIORef, writeIORef)
import Data.IntMap.Strict (IntMap)
import Foreign.Ptr (minusPtr, plusPtr)
import Network.ByteOrder
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports
import Network.HPACK (TokenHeaderList, setLimitForEncoding, toHeaderTable)
import Network.HTTP2.Frame
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.EncodeFrame
import Network.HTTP2.H2.File
import Network.HTTP2.H2.HPACK
import Network.HTTP2.H2.Manager hiding (start)
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Settings
import Network.HTTP2.H2.Stream
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window
----------------------------------------------------------------
data Leftover
= LZero
| LOne B.BufferWriter
| LTwo ByteString B.BufferWriter
----------------------------------------------------------------
{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming tbq = atomically $ do
isEmpty <- isEmptyTBQueue tbq
checkSTM (not isEmpty)
data Switch
= C Control
| O (Output Stream)
| Flush
wrapException :: E.SomeException -> IO ()
wrapException se
| Just (e :: HTTP2Error) <- E.fromException se = E.throwIO e
| otherwise = E.throwIO $ BadThingHappen se
-- Peer SETTINGS_INITIAL_WINDOW_SIZE
-- Adjusting initial window size for streams
updatePeerSettings :: Context -> SettingsList -> IO ()
updatePeerSettings Context{peerSettings, oddStreamTable, evenStreamTable} peerAlist = do
oldws <- initialWindowSize <$> readIORef peerSettings
modifyIORef' peerSettings $ \old -> fromSettingsList old peerAlist
newws <- initialWindowSize <$> readIORef peerSettings
-- FIXME: race condition
-- 1) newOddStream reads old peerSettings and
-- insert it to its stream table after adjusting.
-- 2) newOddStream reads new peerSettings and
-- insert it to its stream table before adjusting.
let dif = newws - oldws
when (dif /= 0) $ do
getOddStreams oddStreamTable >>= updateAllStreamTxFlow dif
getEvenStreams evenStreamTable >>= updateAllStreamTxFlow dif
where
updateAllStreamTxFlow :: WindowSize -> IntMap Stream -> IO ()
updateAllStreamTxFlow siz strms =
forM_ strms $ \strm -> increaseStreamWindowSize strm siz
frameSender :: Context -> Config -> Manager -> IO ()
frameSender
ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit}
Config{..}
mgr = loop 0 `E.catch` wrapException
where
----------------------------------------------------------------
loop :: Offset -> IO ()
loop off = do
x <- atomically $ dequeue off
case x of
C ctl -> flushN off >> control ctl >> loop 0
O out -> outputOrEnqueueAgain out off >>= flushIfNecessary >>= loop
Flush -> flushN off >> loop 0
-- Flush the connection buffer to the socket, where the first 'n' bytes of
-- the buffer are filled.
flushN :: Offset -> IO ()
flushN 0 = return ()
flushN n = bufferIO confWriteBuffer n confSendAll
flushIfNecessary :: Offset -> IO Offset
flushIfNecessary off = do
buflim <- readIORef outputBufferLimit
if off <= buflim - 512
then return off
else do
flushN off
return 0
dequeue :: Offset -> STM Switch
dequeue off = do
isEmptyC <- isEmptyTQueue controlQ
if isEmptyC
then do
-- FLOW CONTROL: WINDOW_UPDATE 0: send: respecting peer's limit
waitConnectionWindowSize ctx
isEmptyO <- isEmptyTQueue outputQ
if isEmptyO
then if off /= 0 then return Flush else retrySTM
else O <$> readTQueue outputQ
else C <$> readTQueue controlQ
----------------------------------------------------------------
copyAll [] buf = return buf
copyAll (x : xs) buf = copy buf x >>= copyAll xs
-- called with off == 0
control :: Control -> IO ()
control (CFinish e) = E.throwIO e
control (CGoaway bs mvar) = do
buf <- copyAll [bs] confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
flushN off
putMVar mvar ()
E.throwIO GoAwayIsSent
control (CFrames ms xs) = do
buf <- copyAll xs confWriteBuffer
let off = buf `minusPtr` confWriteBuffer
flushN off
case ms of
Nothing -> return ()
Just peerAlist -> do
-- Peer SETTINGS_INITIAL_WINDOW_SIZE
updatePeerSettings ctx peerAlist
-- Peer SETTINGS_MAX_FRAME_SIZE
case lookup SettingsMaxFrameSize peerAlist of
Nothing -> return ()
Just payloadLen -> do
let dlim = payloadLen + frameHeaderLength
buflim
| confBufferSize >= dlim = dlim
| otherwise = confBufferSize
writeIORef outputBufferLimit buflim
-- Peer SETTINGS_HEADER_TABLE_SIZE
case lookup SettingsHeaderTableSize peerAlist of
Nothing -> return ()
Just siz -> setLimitForEncoding siz encodeDynamicTable
----------------------------------------------------------------
output :: Output Stream -> Offset -> WindowSize -> IO Offset
output out@(Output strm OutObj{} (ONext curr tlrmkr) _ sentinel) off0 lim = do
-- Data frame payload
buflim <- readIORef outputBufferLimit
let payloadOff = off0 + frameHeaderLength
datBuf = confWriteBuffer `plusPtr` payloadOff
datBufSiz = buflim - payloadOff
Next datPayloadLen reqflush mnext <- curr datBuf datBufSiz lim -- checkme
NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
fillDataHeaderEnqueueNext
strm
off0
datPayloadLen
mnext
tlrmkr'
sentinel
out
reqflush
output out@(Output strm (OutObj hdr body tlrmkr) OObj mtbq _) off0 lim = do
-- Header frame and Continuation frame
let sid = streamNumber strm
endOfStream = case body of
OutBodyNone -> True
_ -> False
(ths, _) <- toHeaderTable $ fixHeaders hdr
off' <- headerContinue sid ths endOfStream off0
-- halfClosedLocal calls closed which removes
-- the stream from stream table.
when endOfStream $ halfClosedLocal ctx strm Finished
off <- flushIfNecessary off'
case body of
OutBodyNone -> return off
OutBodyFile (FileSpec path fileoff bytecount) -> do
(pread, sentinel') <- confPositionReadMaker path
refresh <- case sentinel' of
Closer closer -> timeoutClose mgr closer
Refresher refresher -> return refresher
let next = fillFileBodyGetNext pread fileoff bytecount refresh
out' = out{outputType = ONext next tlrmkr}
output out' off lim
OutBodyBuilder builder -> do
let next = fillBuilderBodyGetNext builder
out' = out{outputType = ONext next tlrmkr}
output out' off lim
OutBodyStreaming _ ->
output (setNextForStreaming mtbq tlrmkr out) off lim
OutBodyStreamingUnmask _ ->
output (setNextForStreaming mtbq tlrmkr out) off lim
output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do
-- Creating a push promise header
-- Frame id should be associated stream id from the client.
let sid = streamNumber strm
len <- pushPromise pid sid ths off0
off <- flushIfNecessary $ off0 + frameHeaderLength + len
output out{outputType = OObj} off lim
output _ _ _ = undefined -- never reach
----------------------------------------------------------------
setNextForStreaming
:: Maybe (TBQueue StreamingChunk)
-> TrailersMaker
-> Output Stream
-> Output Stream
setNextForStreaming mtbq tlrmkr out =
let tbq = fromJust mtbq
takeQ = atomically $ tryReadTBQueue tbq
next = fillStreamBodyGetNext takeQ
in out{outputType = ONext next tlrmkr}
----------------------------------------------------------------
outputOrEnqueueAgain :: Output Stream -> Offset -> IO Offset
outputOrEnqueueAgain out@(Output strm _ otyp _ _) off = E.handle resetStream $ do
state <- readStreamState strm
if isHalfClosedLocal state
then return off
else case otyp of
OWait wait -> do
-- Checking if all push are done.
forkAndEnqueueWhenReady wait outputQ out{outputType = OObj} mgr
return off
_ -> case mtbq of
Just tbq -> checkStreaming tbq
_ -> checkStreamWindowSize
where
mtbq = outputStrmQ out
checkStreaming tbq = do
isEmpty <- atomically $ isEmptyTBQueue tbq
if isEmpty
then do
forkAndEnqueueWhenReady (waitStreaming tbq) outputQ out mgr
return off
else checkStreamWindowSize
-- FLOW CONTROL: WINDOW_UPDATE: send: respecting peer's limit
checkStreamWindowSize = do
sws <- getStreamWindowSize strm
if sws == 0
then do
forkAndEnqueueWhenReady (waitStreamWindowSize strm) outputQ out mgr
return off
else do
cws <- getConnectionWindowSize ctx -- not 0
let lim = min cws sws
output out off lim
resetStream e = do
closed ctx strm (ResetByMe e)
let rst = resetFrame InternalError $ streamNumber strm
enqueueControl controlQ $ CFrames Nothing [rst]
return off
----------------------------------------------------------------
headerContinue :: StreamId -> TokenHeaderList -> Bool -> Offset -> IO Offset
headerContinue sid ths0 endOfStream off0 = do
buflim <- readIORef outputBufferLimit
let offkv = off0 + frameHeaderLength
bufkv = confWriteBuffer `plusPtr` offkv
limkv = buflim - offkv
(ths, kvlen) <- hpackEncodeHeader ctx bufkv limkv ths0
if kvlen == 0
then continue off0 ths FrameHeaders
else do
let flag = getFlag ths
buf = confWriteBuffer `plusPtr` off0
off = offkv + kvlen
fillFrameHeader FrameHeaders kvlen sid flag buf
continue off ths FrameContinuation
where
eos = if endOfStream then setEndStream else id
getFlag [] = eos $ setEndHeader defaultFlags
getFlag _ = eos $ defaultFlags
continue :: Offset -> TokenHeaderList -> FrameType -> IO Offset
continue off [] _ = return off
continue off ths ft = do
flushN off
-- Now off is 0
buflim <- readIORef outputBufferLimit
let bufHeaderPayload = confWriteBuffer `plusPtr` frameHeaderLength
headerPayloadLim = buflim - frameHeaderLength
(ths', kvlen') <-
hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths
when (ths == ths') $
E.throwIO $
ConnectionErrorIsSent CompressionError sid "cannot compress the header"
let flag = getFlag ths'
off' = frameHeaderLength + kvlen'
fillFrameHeader ft kvlen' sid flag confWriteBuffer
continue off' ths' FrameContinuation
----------------------------------------------------------------
fillDataHeaderEnqueueNext
:: Stream
-> Offset
-> Int
-> Maybe DynaNext
-> (Maybe ByteString -> IO NextTrailersMaker)
-> IO ()
-> Output Stream
-> Bool
-> IO Offset
fillDataHeaderEnqueueNext
strm@Stream{streamNumber}
off
datPayloadLen
Nothing
tlrmkr
tell
_
reqflush = do
let buf = confWriteBuffer `plusPtr` off
off' = off + frameHeaderLength + datPayloadLen
(mtrailers, flag) <- do
Trailers trailers <- tlrmkr Nothing
if null trailers
then return (Nothing, setEndStream defaultFlags)
else return (Just trailers, defaultFlags)
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
off'' <- handleTrailers mtrailers off'
void tell
halfClosedLocal ctx strm Finished
decreaseWindowSize ctx strm datPayloadLen
if reqflush
then do
flushN off''
return 0
else return off''
where
handleTrailers Nothing off0 = return off0
handleTrailers (Just trailers) off0 = do
(ths, _) <- toHeaderTable trailers
headerContinue streamNumber ths True {- endOfStream -} off0
fillDataHeaderEnqueueNext
_
off
0
(Just next)
tlrmkr
_
out
reqflush = do
let out' = out{outputType = ONext next tlrmkr}
enqueueOutput outputQ out'
if reqflush
then do
flushN off
return 0
else return off
fillDataHeaderEnqueueNext
strm@Stream{streamNumber}
off
datPayloadLen
(Just next)
tlrmkr
_
out
reqflush = do
let buf = confWriteBuffer `plusPtr` off
off' = off + frameHeaderLength + datPayloadLen
flag = defaultFlags
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
decreaseWindowSize ctx strm datPayloadLen
let out' = out{outputType = ONext next tlrmkr}
enqueueOutput outputQ out'
if reqflush
then do
flushN off'
return 0
else return off'
----------------------------------------------------------------
pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int
pushPromise pid sid ths off = do
let offsid = off + frameHeaderLength -- checkme
bufsid = confWriteBuffer `plusPtr` offsid
poke32 (fromIntegral sid) bufsid 0
let offkv = offsid + 4
bufkv = confWriteBuffer `plusPtr` offkv
limkv = confBufferSize - offkv
(_, kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
let flag = setEndHeader defaultFlags -- No EndStream flag
buf = confWriteBuffer `plusPtr` off
len = kvlen + 4
fillFrameHeader FramePushPromise len pid flag buf
return len
----------------------------------------------------------------
{-# INLINE fillFrameHeader #-}
fillFrameHeader :: FrameType -> Int -> StreamId -> FrameFlags -> Buffer -> IO ()
fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf
where
hinfo =
FrameHeader
{ payloadLength = len
, flags = flag
, streamId = sid
}
-- | Running trailers-maker.
--
-- > bufferIO buf siz $ \bs -> tlrmkr (Just bs)
runTrailersMaker :: TrailersMaker -> Buffer -> Int -> IO NextTrailersMaker
runTrailersMaker tlrmkr buf siz = bufferIO buf siz $ \bs -> tlrmkr (Just bs)
----------------------------------------------------------------
fillBuilderBodyGetNext :: Builder -> DynaNext
fillBuilderBodyGetNext bb buf siz lim = do
let room = min siz lim
(len, signal) <- B.runBuilder bb buf room
return $ nextForBuilder len signal
fillFileBodyGetNext
:: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillFileBodyGetNext pread start bytecount refresh buf siz lim = do
let room = min siz lim
len <- pread start (mini room bytecount) buf
let len' = fromIntegral len
return $ nextForFile len' pread (start + len) (bytecount - len) refresh
fillStreamBodyGetNext :: IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext takeQ buf siz lim = do
let room = min siz lim
(cont, len, reqflush, leftover) <- runStreamBuilder buf room takeQ
return $ nextForStream cont len reqflush leftover takeQ
----------------------------------------------------------------
fillBufBuilder :: Leftover -> DynaNext
fillBufBuilder leftover buf0 siz0 lim = do
let room = min siz0 lim
case leftover of
LZero -> error "fillBufBuilder: LZero"
LOne writer -> do
(len, signal) <- writer buf0 room
getNext len signal
LTwo bs writer
| BS.length bs <= room -> do
buf1 <- copy buf0 bs
let len1 = BS.length bs
(len2, signal) <- writer buf1 (room - len1)
getNext (len1 + len2) signal
| otherwise -> do
let (bs1, bs2) = BS.splitAt room bs
void $ copy buf0 bs1
getNext room (B.Chunk bs2 writer)
where
getNext l s = return $ nextForBuilder l s
nextForBuilder :: BytesFilled -> B.Next -> Next
nextForBuilder len B.Done =
Next len True Nothing -- let's flush
nextForBuilder len (B.More _ writer) =
Next len False $ Just (fillBufBuilder (LOne writer))
nextForBuilder len (B.Chunk bs writer) =
Next len False $ Just (fillBufBuilder (LTwo bs writer))
----------------------------------------------------------------
runStreamBuilder
:: Buffer
-> BufferSize
-> IO (Maybe StreamingChunk)
-> IO
( Bool -- continue
, BytesFilled
, Bool -- require flusing
, Leftover
)
runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0
where
loop buf room total = do
mbuilder <- takeQ
case mbuilder of
Nothing -> return (True, total, False, LZero)
Just (StreamingBuilder builder) -> do
(len, signal) <- B.runBuilder builder buf room
let total' = total + len
case signal of
B.Done -> loop (buf `plusPtr` len) (room - len) total'
B.More _ writer -> return (True, total', False, LOne writer)
B.Chunk bs writer -> return (True, total', False, LTwo bs writer)
Just StreamingFlush -> return (True, total, True, LZero)
Just (StreamingFinished dec) -> do
dec
return (False, total, True, LZero)
fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext
fillBufStream leftover0 takeQ buf0 siz0 lim0 = do
let room0 = min siz0 lim0
case leftover0 of
LZero -> do
(cont, len, reqflush, leftover) <- runStreamBuilder buf0 room0 takeQ
getNext cont len reqflush leftover
LOne writer -> write writer buf0 room0 0
LTwo bs writer
| BS.length bs <= room0 -> do
buf1 <- copy buf0 bs
let len = BS.length bs
write writer buf1 (room0 - len) len
| otherwise -> do
let (bs1, bs2) = BS.splitAt room0 bs
void $ copy buf0 bs1
getNext True room0 False $ LTwo bs2 writer
where
getNext :: Bool -> BytesFilled -> Bool -> Leftover -> IO Next
getNext cont len reqflush l = return $ nextForStream cont len reqflush l takeQ
write
:: (Buffer -> BufferSize -> IO (Int, B.Next))
-> Buffer
-> BufferSize
-> Int
-> IO Next
write writer1 buf room sofar = do
(len, signal) <- writer1 buf room
case signal of
B.Done -> do
(cont, extra, reqflush, leftover) <-
runStreamBuilder (buf `plusPtr` len) (room - len) takeQ
let total = sofar + len + extra
getNext cont total reqflush leftover
B.More _ writer -> do
let total = sofar + len
getNext True total False $ LOne writer
B.Chunk bs writer -> do
let total = sofar + len
getNext True total False $ LTwo bs writer
nextForStream
:: Bool
-> BytesFilled
-> Bool
-> Leftover
-> IO (Maybe StreamingChunk)
-> Next
nextForStream False len reqflush _ _ = Next len reqflush Nothing
nextForStream True len reqflush leftOrZero takeQ =
Next len reqflush $ Just (fillBufStream leftOrZero takeQ)
----------------------------------------------------------------
fillBufFile :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillBufFile pread start bytes refresh buf siz lim = do
let room = min siz lim
len <- pread start (mini room bytes) buf
refresh
let len' = fromIntegral len
return $ nextForFile len' pread (start + len) (bytes - len) refresh
nextForFile
:: BytesFilled -> PositionRead -> FileOffset -> ByteCount -> IO () -> Next
nextForFile 0 _ _ _ _ = Next 0 True Nothing -- let's flush
nextForFile len _ _ 0 _ = Next len False Nothing
nextForFile len pread start bytes refresh =
Next len False $ Just $ fillBufFile pread start bytes refresh
{-# INLINE mini #-}
mini :: Int -> Int64 -> Int64
mini i n
| fromIntegral i < n = fromIntegral i
| otherwise = n
|