File: Run.hs

package info (click to toggle)
haskell-http2 5.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 55,120 kB
  • sloc: haskell: 7,911; makefile: 3
file content (256 lines) | stat: -rw-r--r-- 9,582 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Data.IP (IPv6)
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import qualified System.ThreadManager as T
import Text.Read (readMaybe)

import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2

-- | Client configuration
data ClientConfig = ClientConfig
    { scheme :: Scheme
    -- ^ https or http
    , authority :: Authority
    -- ^ Server name
    , cacheLimit :: Int
    -- ^ The maximum number of incoming streams on the net
    , connectionWindowSize :: WindowSize
    -- ^ The window size of connection.
    , settings :: Settings
    -- ^ Settings
    }
    deriving (Eq, Show)

-- | The default client config.
--
-- The @authority@ field will be used to set the HTTP2 @:authority@
-- pseudo-header. In most cases you will want to override it to be equal to
-- @host@.
--
-- Further background on @authority@:
-- [RFC 3986](https://datatracker.ietf.org/doc/html/rfc3986#section-3.2) also
-- allows @host:port@, and most servers will accept this too. However, when
-- using TLS, many servers will expect the TLS SNI server name and the
-- @:authority@ pseudo-header to be equal, and for TLS SNI the server name
-- should not include the port. Note that HTTP2 explicitly /disallows/ using
-- @userinfo\@@ as part of the authority.
--
-- >>> defaultClientConfig
-- ClientConfig {scheme = "http", authority = "localhost", cacheLimit = 64, connectionWindowSize = 16777216, settings = Settings {headerTableSize = 4096, enablePush = True, maxConcurrentStreams = Just 64, initialWindowSize = 262144, maxFrameSize = 16384, maxHeaderListSize = Nothing, pingRateLimit = 10, emptyFrameRateLimit = 4, settingsRateLimit = 4, rstRateLimit = 4}}
defaultClientConfig :: ClientConfig
defaultClientConfig =
    ClientConfig
        { scheme = "http"
        , authority = "localhost"
        , cacheLimit = 64
        , connectionWindowSize = defaultMaxData
        , settings = defaultSettings
        }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run cconf@ClientConfig{..} conf client = do
    ctx <- setup cconf conf
    runH2 conf ctx $ runClient ctx
  where
    serverMaxStreams ctx = do
        mx <- maxConcurrentStreams <$> readIORef (peerSettings ctx)
        case mx of
            Nothing -> return maxBound
            Just x -> return x
    possibleClientStream ctx = do
        x <- serverMaxStreams ctx
        n <- oddConc <$> readTVarIO (oddStreamTable ctx)
        return (x - n)
    aux ctx =
        Aux
            { auxPossibleClientStreams = possibleClientStream ctx
            }
    clientCore ctx req processResponse = do
        (strm, moutobj) <- makeStream ctx scheme authority req
        case moutobj of
            Nothing -> return ()
            Just outobj -> sendRequest conf ctx strm outobj False
        rsp <- getResponse strm
        x <- processResponse rsp
        adjustRxWindow ctx strm
        return x
    runClient ctx = client (clientCore ctx) $ aux ctx

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf@ClientConfig{..} conf@Config{..} action = do
    ctx@Context{..} <- setup cconf conf
    let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
        putR req = do
            (strm, moutobj) <- makeStream ctx scheme authority req
            case moutobj of
                Nothing -> return ()
                Just outobj -> sendRequest conf ctx strm outobj True
            return (streamNumber strm, strm)
        get = getResponse
        create = openOddStreamWait ctx
    runClient <-
        action $ ClientIO confMySockAddr confPeerSockAddr putR get putB create
    runH2 conf ctx runClient

getResponse :: Stream -> IO Response
getResponse strm = do
    mRsp <- takeMVar $ streamInput strm
    case mRsp of
        Left err -> throwIO err
        Right rsp -> return $ Response rsp

setup :: ClientConfig -> Config -> IO Context
setup ClientConfig{..} conf@Config{..} = do
    let clientInfo = newClientInfo scheme authority
    ctx <-
        newContext
            clientInfo
            conf
            cacheLimit
            connectionWindowSize
            settings
            confTimeoutManager
    exchangeSettings ctx
    return ctx

runH2 :: Config -> Context -> IO a -> IO a
runH2 conf ctx runClient = do
    T.stopAfter mgr (try runAll >>= closureClient conf) $ \res ->
        closeAllStreams (oddStreamTable ctx) (evenStreamTable ctx) res
  where
    mgr = threadManager ctx
    runReceiver = frameReceiver ctx conf
    runSender = frameSender ctx conf
    runBackgroundThreads = do
        labelMe "H2 runBackgroundThreads"
        concurrently_ runReceiver runSender
    runAll = do
        er <- race runBackgroundThreads runClient
        case er of
            Left () -> undefined
            Right r -> return r

makeStream
    :: Context
    -> Scheme
    -> Authority
    -> Request
    -> IO (Stream, Maybe OutObj)
makeStream ctx@Context{..} scheme auth (Request req) = do
    -- Checking push promises
    let hdr0 = outObjHeaders req
        method = fromMaybe (error "makeStream:method") $ lookup ":method" hdr0
        path = fromMaybe (error "makeStream:path") $ lookup ":path" hdr0
    mstrm0 <- lookupEvenCache evenStreamTable method path
    case mstrm0 of
        Just strm0 -> do
            deleteEvenCache evenStreamTable method path
            return (strm0, Nothing)
        Nothing -> do
            -- Arch/Sender is originally implemented for servers where
            -- the ordering of responses can be out-of-order.
            -- But for clients, the ordering must be maintained.
            -- To implement this, 'outputQStreamID' is used.
            let isIPv6 = isJust (readMaybe auth :: Maybe IPv6)
                auth'
                    | isIPv6 = "[" <> UTF8.fromString auth <> "]"
                    | otherwise = UTF8.fromString auth
            let hdr1, hdr2 :: [Header]
                hdr1
                    | scheme /= "" = (":scheme", scheme) : hdr0
                    | otherwise = hdr0
                hdr2
                    | auth /= "" = (":authority", auth') : hdr1
                    | otherwise = hdr1
                req' = req{outObjHeaders = hdr2}
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (_sid, newstrm) <- openOddStreamWait ctx
            return (newstrm, Just req')

sendRequest :: Config -> Context -> Stream -> OutObj -> Bool -> IO ()
sendRequest Config{..} ctx@Context{..} strm OutObj{..} io = do
    let sid = streamNumber strm
    (mnext, mtbq) <- case outObjBody of
        OutBodyNone -> return (Nothing, Nothing)
        OutBodyFile (FileSpec path fileoff bytecount) -> do
            (pread, sentinel) <- confPositionReadMaker path
            let next = fillFileBodyGetNext pread fileoff bytecount sentinel
            return (Just next, Nothing)
        OutBodyBuilder builder -> do
            let next = fillBuilderBodyGetNext builder
            return (Just next, Nothing)
        OutBodyStreaming strmbdy -> do
            q <- sendStreaming ctx strm $ \iface ->
                outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
            let next = nextForStreaming q
            return (Just next, Just q)
        OutBodyStreamingIface strmbdy -> do
            q <- sendStreaming ctx strm strmbdy
            let next = nextForStreaming q
            return (Just next, Just q)
    let ot = OHeader outObjHeaders mnext outObjTrailers
    if io
        then do
            let out = makeOutputIO ctx strm ot
            pushOutput sid out
        else do
            (pop, out) <- makeOutput strm ot
            pushOutput sid out
            lc <- newLoopCheck strm mtbq
            T.forkManaged threadManager label $ syncWithSender' ctx pop lc
  where
    label = "H2 request sender for stream " ++ show (streamNumber strm)
    pushOutput sid out = atomically $ do
        sidOK <- readTVar outputQStreamID
        check (sidOK == sid)
        writeTVar outputQStreamID (sid + 2)
        enqueueOutputSTM outputQ out

sendStreaming
    :: Context
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming Context{..} strm strmbdy = do
    tbq <- newTBQueueIO 10 -- fixme: hard coding: 10
    T.forkManagedUnmask threadManager label $ \unmask ->
        withOutBodyIface tbq unmask strmbdy
    return tbq
  where
    label = "H2 request streaming sender for stream " ++ show (streamNumber strm)

exchangeSettings :: Context -> IO ()
exchangeSettings Context{..} = do
    connRxWS <- rxfBufSize <$> readIORef rxFlow
    let frames = makeNegotiationFrames mySettings connRxWS
        setframe = CFrames Nothing (connectionPreface : frames)
    writeIORef myFirstSettings True
    enqueueControl controlQ setframe

data ClientIO = ClientIO
    { cioMySockAddr :: SockAddr
    , cioPeerSockAddr :: SockAddr
    , cioWriteRequest :: Request -> IO (StreamId, Stream)
    , cioReadResponse :: Stream -> IO Response
    , cioWriteBytes :: ByteString -> IO ()
    , cioCreateStream :: IO (StreamId, Stream)
    }