File: Worker.hs

package info (click to toggle)
haskell-http2 5.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 55,120 kB
  • sloc: haskell: 7,911; makefile: 3
file content (185 lines) | stat: -rw-r--r-- 6,350 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    runServer,
) where

import Control.Concurrent.STM
import Data.IORef
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import Network.HTTP.Semantics.Server
import Network.HTTP.Semantics.Server.Internal
import Network.HTTP.Types
import qualified System.ThreadManager as T

import Imports hiding (insert)
import Network.HTTP2.Frame
import Network.HTTP2.H2

----------------------------------------------------------------

runServer :: Config -> Server -> Launch
runServer conf server ctx@Context{..} strm req =
    T.forkManagedTimeout threadManager label $ \th -> do
        let req' = pauseRequestBody th
            aux = Aux th mySockAddr peerSockAddr
            request = Request req'
        lc <- newLoopCheck strm Nothing
        server request aux $ sendResponse conf ctx lc strm request
        adjustRxWindow ctx strm
  where
    label = "H2 response sender for stream " ++ show (streamNumber strm)
    pauseRequestBody th = req{inpObjBody = readBody'}
      where
        readBody = inpObjBody req
        readBody' = do
            T.pause th
            bs <- readBody
            T.resume th
            return bs

----------------------------------------------------------------

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
sendResponse
    :: Config
    -> Context
    -> LoopCheck
    -> Stream
    -> Request
    -> Response
    -> [PushPromise]
    -> IO ()
sendResponse conf ctx lc strm (Request req) (Response rsp) pps = do
    mwait <- pushStream conf ctx strm reqvt pps
    case mwait of
        Nothing -> return ()
        Just wait -> wait -- all pushes are sent
    sendHeaderBody conf ctx lc strm rsp
  where
    (_, reqvt) = inpObjHeaders req

----------------------------------------------------------------

pushStream
    :: Config
    -> Context
    -> Stream -- parent stream
    -> ValueTable -- request
    -> [PushPromise]
    -> IO (Maybe (IO ()))
pushStream _ _ _ _ [] = return Nothing
pushStream conf ctx@Context{..} pstrm reqvt pps0
    | len == 0 = return Nothing
    | otherwise = do
        pushable <- enablePush <$> readIORef peerSettings
        if pushable
            then do
                tvar <- newTVarIO 0
                lim <- push tvar pps0 0
                if lim == 0
                    then return Nothing
                    else return $ Just $ waiter lim tvar
            else return Nothing
  where
    len = length pps0
    increment tvar = atomically $ modifyTVar' tvar (+ 1)
    -- Checking if all push are done.
    waiter lim tvar = atomically $ do
        n <- readTVar tvar
        check (n >= lim)
    push _ [] n = return (n :: Int)
    push tvar (pp : pps) n = do
        T.forkManaged threadManager "H2 server push" $ do
            (pid, newstrm) <- makePushStream ctx pstrm
            let scheme = fromJust $ getFieldValue tokenScheme reqvt
                -- fixme: this value can be Nothing
                auth =
                    fromJust
                        ( getFieldValue tokenAuthority reqvt
                            <|> getFieldValue tokenHost reqvt
                        )
                path = promiseRequestPath pp
                promiseRequest =
                    [ (tokenMethod, methodGet)
                    , (tokenScheme, scheme)
                    , (tokenAuthority, auth)
                    , (tokenPath, path)
                    ]
                ot = OPush promiseRequest pid
                Response rsp = promiseResponse pp
            increment tvar
            lc <- newLoopCheck newstrm Nothing
            syncWithSender ctx newstrm ot lc
            sendHeaderBody conf ctx lc newstrm rsp
        push tvar pps (n + 1)

----------------------------------------------------------------

makePushStream :: Context -> Stream -> IO (StreamId, Stream)
makePushStream ctx pstrm = do
    -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
    (_, newstrm) <- openEvenStreamWait ctx
    let pid = streamNumber pstrm
    return (pid, newstrm)

----------------------------------------------------------------

sendHeaderBody
    :: Config
    -> Context
    -> LoopCheck
    -> Stream
    -> OutObj
    -> IO ()
sendHeaderBody Config{..} ctx lc strm OutObj{..} = do
    (mnext, mtbq) <- case outObjBody of
        OutBodyNone -> return (Nothing, Nothing)
        OutBodyFile (FileSpec path fileoff bytecount) -> do
            (pread, sentinel) <- confPositionReadMaker path
            let next = fillFileBodyGetNext pread fileoff bytecount sentinel
            return (Just next, Nothing)
        OutBodyBuilder builder -> do
            let next = fillBuilderBodyGetNext builder
            return (Just next, Nothing)
        OutBodyStreaming strmbdy -> do
            q <- sendStreaming ctx strm $ \OutBodyIface{..} -> strmbdy outBodyPush outBodyFlush
            let next = nextForStreaming q
            return (Just next, Just q)
        OutBodyStreamingIface strmbdy -> do
            q <- sendStreaming ctx strm strmbdy
            let next = nextForStreaming q
            return (Just next, Just q)
    let lc' = lc{lcTBQ = mtbq}
    syncWithSender ctx strm (OHeader outObjHeaders mnext outObjTrailers) lc'

----------------------------------------------------------------

sendStreaming
    :: Context
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming Context{..} strm strmbdy = do
    tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
    T.forkManagedTimeout threadManager label $ \th ->
        withOutBodyIface tbq id $ \iface -> do
            let iface' =
                    iface
                        { outBodyPush = \b -> do
                            T.pause th
                            outBodyPush iface b
                            T.resume th
                        , outBodyPushFinal = \b -> do
                            T.pause th
                            outBodyPushFinal iface b
                            T.resume th
                        }
            strmbdy iface'
    return tbq
  where
    label = "H2 response streaming sender for " ++ show (streamNumber strm)