1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249
|
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.Server.Worker (
worker,
WorkerConf (..),
fromContext,
) where
import Data.IORef
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException (..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Frame
import Network.HTTP2.H2
import Network.HTTP2.Server.Types
----------------------------------------------------------------
data WorkerConf a = WorkerConf
{ readInputQ :: IO (Input a)
, writeOutputQ :: Output a -> IO ()
, workerCleanup :: a -> IO ()
, isPushable :: IO Bool
, makePushStream :: a -> PushPromise -> IO (StreamId, a)
, mySockAddr :: SockAddr
, peerSockAddr :: SockAddr
}
fromContext :: Context -> WorkerConf Stream
fromContext ctx@Context{..} =
WorkerConf
{ readInputQ = atomically $ readTQueue $ inputQ $ toServerInfo roleInfo
, writeOutputQ = enqueueOutput outputQ
, workerCleanup = \strm -> do
closed ctx strm Killed
let frame = resetFrame InternalError $ streamNumber strm
enqueueControl controlQ $ CFrames Nothing [frame]
, -- Peer SETTINGS_ENABLE_PUSH
isPushable = enablePush <$> readIORef peerSettings
, -- Peer SETTINGS_INITIAL_WINDOW_SIZE
makePushStream = \pstrm _ -> do
-- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
(_, newstrm) <- openEvenStreamWait ctx
let pid = streamNumber pstrm
return (pid, newstrm)
, mySockAddr = mySockAddr
, peerSockAddr = peerSockAddr
}
----------------------------------------------------------------
pushStream
:: WorkerConf a
-> a -- parent stream
-> ValueTable -- request
-> [PushPromise]
-> IO OutputType
pushStream _ _ _ [] = return OObj
pushStream WorkerConf{..} pstrm reqvt pps0
| len == 0 = return OObj
| otherwise = do
pushable <- isPushable
if pushable
then do
tvar <- newTVarIO 0
lim <- push tvar pps0 0
if lim == 0
then return OObj
else return $ OWait (waiter lim tvar)
else return OObj
where
len = length pps0
increment tvar = atomically $ modifyTVar' tvar (+ 1)
waiter lim tvar = atomically $ do
n <- readTVar tvar
checkSTM (n >= lim)
push _ [] n = return (n :: Int)
push tvar (pp : pps) n = do
(pid, newstrm) <- makePushStream pstrm pp
let scheme = fromJust $ getHeaderValue tokenScheme reqvt
-- fixme: this value can be Nothing
auth =
fromJust
( getHeaderValue tokenAuthority reqvt
<|> getHeaderValue tokenHost reqvt
)
path = promiseRequestPath pp
promiseRequest =
[ (tokenMethod, H.methodGet)
, (tokenScheme, scheme)
, (tokenAuthority, auth)
, (tokenPath, path)
]
ot = OPush promiseRequest pid
Response rsp = promiseResponse pp
out = Output newstrm rsp ot Nothing $ increment tvar
writeOutputQ out
push tvar pps (n + 1)
-- | This function is passed to workers.
-- They also pass 'Response's from a server to this function.
-- This function enqueues commands for the HTTP/2 sender.
response
:: WorkerConf a
-> Manager
-> T.Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps = case outObjBody rsp of
OutBodyNone -> do
setThreadContinue tconf True
writeOutputQ $ Output strm rsp OObj Nothing (return ())
OutBodyBuilder _ -> do
otyp <- pushStream wc strm reqvt pps
setThreadContinue tconf True
writeOutputQ $ Output strm rsp otyp Nothing (return ())
OutBodyFile _ -> do
otyp <- pushStream wc strm reqvt pps
setThreadContinue tconf True
writeOutputQ $ Output strm rsp otyp Nothing (return ())
OutBodyStreaming strmbdy -> do
otyp <- pushStream wc strm reqvt pps
-- We must not exit this server application.
-- If the application exits, streaming would be also closed.
-- So, this work occupies this thread.
--
-- We need to increase the number of workers.
spawnAction mgr
-- After this work, this thread stops to decease
-- the number of workers.
setThreadContinue tconf False
-- Since streaming body is loop, we cannot control it.
-- So, let's serialize 'Builder' with a designated queue.
tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
writeOutputQ $ Output strm rsp otyp (Just tbq) (return ())
let push b = do
T.pause th
atomically $ writeTBQueue tbq (StreamingBuilder b)
T.resume th
flush = atomically $ writeTBQueue tbq StreamingFlush
finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
incCounter mgr
strmbdy push flush `E.finally` finished
OutBodyStreamingUnmask _ ->
error "response: server does not support OutBodyStreamingUnmask"
where
(_, reqvt) = inpObjHeaders req
-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker wc@WorkerConf{..} mgr server = do
sinfo <- newStreamInfo
tcont <- newThreadContinue
timeoutKillThread mgr $ go sinfo tcont
where
go sinfo tcont th = do
setThreadContinue tcont True
ex <- E.trySyncOrAsync $ do
T.pause th
Input strm req <- readInputQ
let req' = pauseRequestBody req th
setStreamInfo sinfo strm
T.resume th
T.tickle th
let aux = Aux th mySockAddr peerSockAddr
server (Request req') aux $ response wc mgr th tcont strm (Request req')
cont1 <- case ex of
Right () -> return True
Left e@(SomeException _)
-- killed by the local worker manager
| Just KilledByHttp2ThreadManager{} <- E.fromException e -> return False
-- killed by the local timeout manager
| Just T.TimeoutThread <- E.fromException e -> do
cleanup sinfo
return True
| otherwise -> do
cleanup sinfo
return True
cont2 <- getThreadContinue tcont
clearStreamInfo sinfo
when (cont1 && cont2) $ go sinfo tcont th
pauseRequestBody req th = req{inpObjBody = readBody'}
where
readBody = inpObjBody req
readBody' = do
T.pause th
bs <- readBody
T.resume th
return bs
cleanup sinfo = do
minp <- getStreamInfo sinfo
case minp of
Nothing -> return ()
Just strm -> workerCleanup strm
----------------------------------------------------------------
-- A reference is shared by a responder and its worker.
-- The reference refers a value of this type as a return value.
-- If 'True', the worker continue to serve requests.
-- Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)
{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue = ThreadContinue <$> newIORef True
{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue ref) x = writeIORef ref x
{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue ref) = readIORef ref
----------------------------------------------------------------
-- | The type for cleaning up.
newtype StreamInfo a = StreamInfo (IORef (Maybe a))
{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo = StreamInfo <$> newIORef Nothing
{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo (StreamInfo ref) = writeIORef ref Nothing
{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo ref) inp = writeIORef ref $ Just inp
{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo ref) = readIORef ref
|