File: Worker.hs

package info (click to toggle)
haskell-http2 5.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 55,180 kB
  • sloc: haskell: 8,657; makefile: 5
file content (249 lines) | stat: -rw-r--r-- 8,819 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker,
    WorkerConf (..),
    fromContext,
) where

import Data.IORef
import qualified Network.HTTP.Types as H
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException (..))
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Frame
import Network.HTTP2.H2
import Network.HTTP2.Server.Types

----------------------------------------------------------------

data WorkerConf a = WorkerConf
    { readInputQ :: IO (Input a)
    , writeOutputQ :: Output a -> IO ()
    , workerCleanup :: a -> IO ()
    , isPushable :: IO Bool
    , makePushStream :: a -> PushPromise -> IO (StreamId, a)
    , mySockAddr :: SockAddr
    , peerSockAddr :: SockAddr
    }

fromContext :: Context -> WorkerConf Stream
fromContext ctx@Context{..} =
    WorkerConf
        { readInputQ = atomically $ readTQueue $ inputQ $ toServerInfo roleInfo
        , writeOutputQ = enqueueOutput outputQ
        , workerCleanup = \strm -> do
            closed ctx strm Killed
            let frame = resetFrame InternalError $ streamNumber strm
            enqueueControl controlQ $ CFrames Nothing [frame]
        , -- Peer SETTINGS_ENABLE_PUSH
          isPushable = enablePush <$> readIORef peerSettings
        , -- Peer SETTINGS_INITIAL_WINDOW_SIZE
          makePushStream = \pstrm _ -> do
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (_, newstrm) <- openEvenStreamWait ctx
            let pid = streamNumber pstrm
            return (pid, newstrm)
        , mySockAddr = mySockAddr
        , peerSockAddr = peerSockAddr
        }

----------------------------------------------------------------

pushStream
    :: WorkerConf a
    -> a -- parent stream
    -> ValueTable -- request
    -> [PushPromise]
    -> IO OutputType
pushStream _ _ _ [] = return OObj
pushStream WorkerConf{..} pstrm reqvt pps0
    | len == 0 = return OObj
    | otherwise = do
        pushable <- isPushable
        if pushable
            then do
                tvar <- newTVarIO 0
                lim <- push tvar pps0 0
                if lim == 0
                    then return OObj
                    else return $ OWait (waiter lim tvar)
            else return OObj
  where
    len = length pps0
    increment tvar = atomically $ modifyTVar' tvar (+ 1)
    waiter lim tvar = atomically $ do
        n <- readTVar tvar
        checkSTM (n >= lim)
    push _ [] n = return (n :: Int)
    push tvar (pp : pps) n = do
        (pid, newstrm) <- makePushStream pstrm pp
        let scheme = fromJust $ getHeaderValue tokenScheme reqvt
            -- fixme: this value can be Nothing
            auth =
                fromJust
                    ( getHeaderValue tokenAuthority reqvt
                        <|> getHeaderValue tokenHost reqvt
                    )
            path = promiseRequestPath pp
            promiseRequest =
                [ (tokenMethod, H.methodGet)
                , (tokenScheme, scheme)
                , (tokenAuthority, auth)
                , (tokenPath, path)
                ]
            ot = OPush promiseRequest pid
            Response rsp = promiseResponse pp
            out = Output newstrm rsp ot Nothing $ increment tvar
        writeOutputQ out
        push tvar pps (n + 1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
response
    :: WorkerConf a
    -> Manager
    -> T.Handle
    -> ThreadContinue
    -> a
    -> Request
    -> Response
    -> [PushPromise]
    -> IO ()
response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps = case outObjBody rsp of
    OutBodyNone -> do
        setThreadContinue tconf True
        writeOutputQ $ Output strm rsp OObj Nothing (return ())
    OutBodyBuilder _ -> do
        otyp <- pushStream wc strm reqvt pps
        setThreadContinue tconf True
        writeOutputQ $ Output strm rsp otyp Nothing (return ())
    OutBodyFile _ -> do
        otyp <- pushStream wc strm reqvt pps
        setThreadContinue tconf True
        writeOutputQ $ Output strm rsp otyp Nothing (return ())
    OutBodyStreaming strmbdy -> do
        otyp <- pushStream wc strm reqvt pps
        -- We must not exit this server application.
        -- If the application exits, streaming would be also closed.
        -- So, this work occupies this thread.
        --
        -- We need to increase the number of workers.
        spawnAction mgr
        -- After this work, this thread stops to decease
        -- the number of workers.
        setThreadContinue tconf False
        -- Since streaming body is loop, we cannot control it.
        -- So, let's serialize 'Builder' with a designated queue.
        tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
        writeOutputQ $ Output strm rsp otyp (Just tbq) (return ())
        let push b = do
                T.pause th
                atomically $ writeTBQueue tbq (StreamingBuilder b)
                T.resume th
            flush = atomically $ writeTBQueue tbq StreamingFlush
            finished = atomically $ writeTBQueue tbq $ StreamingFinished (decCounter mgr)
        incCounter mgr
        strmbdy push flush `E.finally` finished
    OutBodyStreamingUnmask _ ->
        error "response: server does not support OutBodyStreamingUnmask"
  where
    (_, reqvt) = inpObjHeaders req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker wc@WorkerConf{..} mgr server = do
    sinfo <- newStreamInfo
    tcont <- newThreadContinue
    timeoutKillThread mgr $ go sinfo tcont
  where
    go sinfo tcont th = do
        setThreadContinue tcont True
        ex <- E.trySyncOrAsync $ do
            T.pause th
            Input strm req <- readInputQ
            let req' = pauseRequestBody req th
            setStreamInfo sinfo strm
            T.resume th
            T.tickle th
            let aux = Aux th mySockAddr peerSockAddr
            server (Request req') aux $ response wc mgr th tcont strm (Request req')
        cont1 <- case ex of
            Right () -> return True
            Left e@(SomeException _)
                -- killed by the local worker manager
                | Just KilledByHttp2ThreadManager{} <- E.fromException e -> return False
                -- killed by the local timeout manager
                | Just T.TimeoutThread <- E.fromException e -> do
                    cleanup sinfo
                    return True
                | otherwise -> do
                    cleanup sinfo
                    return True
        cont2 <- getThreadContinue tcont
        clearStreamInfo sinfo
        when (cont1 && cont2) $ go sinfo tcont th
    pauseRequestBody req th = req{inpObjBody = readBody'}
      where
        readBody = inpObjBody req
        readBody' = do
            T.pause th
            bs <- readBody
            T.resume th
            return bs
    cleanup sinfo = do
        minp <- getStreamInfo sinfo
        case minp of
            Nothing -> return ()
            Just strm -> workerCleanup strm

----------------------------------------------------------------

--   A reference is shared by a responder and its worker.
--   The reference refers a value of this type as a return value.
--   If 'True', the worker continue to serve requests.
--   Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)

{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue = ThreadContinue <$> newIORef True

{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue ref) x = writeIORef ref x

{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue ref) = readIORef ref

----------------------------------------------------------------

-- | The type for cleaning up.
newtype StreamInfo a = StreamInfo (IORef (Maybe a))

{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo = StreamInfo <$> newIORef Nothing

{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo (StreamInfo ref) = writeIORef ref Nothing

{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo ref) inp = writeIORef ref $ Just inp

{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo ref) = readIORef ref