File: FillBuf.hs

package info (click to toggle)
haskell-http-semantics 0.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 144 kB
  • sloc: haskell: 1,071; makefile: 2
file content (247 lines) | stat: -rw-r--r-- 9,106 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.HTTP.Semantics.FillBuf (
    -- * Filling a buffer
    Next (..),
    DynaNext,
    BytesFilled,
    StreamingChunk (..),
    IsEndOfStream (..),
    CleanupStream,
    fillBuilderBodyGetNext,
    fillFileBodyGetNext,
    fillStreamBodyGetNext,
) where

import Control.Exception (SomeException)
import Control.Monad
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
import Data.ByteString.Internal
import Data.Int (Int64)
import Data.Maybe
import Foreign.Ptr (plusPtr)
import Network.ByteOrder
import Network.HTTP.Semantics.Client

----------------------------------------------------------------

-- | Write part of a streaming response to the write buffer
--
-- In @http2@ this will be used to construct a single HTTP2 @DATA@ frame
-- (see discussion of the maximum number of bytes, below).
type DynaNext =
    Buffer
    -- ^ Write buffer
    -> Int
    -- ^ Maximum number of bytes we are allowed to write
    --
    -- In @http2@, this maximum will be set to the space left in the write
    -- buffer. Implicitly this also means that this maximum cannot exceed the
    -- maximum size of a HTTP2 frame, since in @http2@ the size of the write
    -- buffer is also used to set @SETTINGS_MAX_FRAME_SIZE@ (see
    -- @confBufferSize@).
    -> IO Next
    -- ^ Information on the data written, and on how to continue if not all data
    -- was written

type BytesFilled = Int

data Next
    = Next
        BytesFilled -- payload length
        Bool -- require flushing
        (Maybe DynaNext)
    | CancelNext (Maybe SomeException)

----------------------------------------------------------------

data StreamingChunk
    = -- | Indicate that the stream is finished
      StreamingFinished (Maybe CleanupStream)
    | -- | Indicate that the stream is cancelled
      StreamingCancelled (Maybe SomeException)
    | -- | Flush the stream
      --
      -- This will cause the write buffer to be written to the network socket,
      -- without waiting for more data.
      StreamingFlush
    | -- | Construct a DATA frame, optionally terminating the stream
      StreamingBuilder Builder IsEndOfStream

-- | Action to run prior to terminating the stream
type CleanupStream = IO ()

data IsEndOfStream
    = -- | The stream is not yet terminated
      NotEndOfStream
    | -- | The stream is terminated
      --
      -- In addition to indicating that the stream is terminated, we can also
      -- specify an optional `Cleanup` handler to be run.
      EndOfStream (Maybe CleanupStream)

----------------------------------------------------------------

fillBuilderBodyGetNext :: Builder -> DynaNext
fillBuilderBodyGetNext bb buf room = do
    (len, signal) <- B.runBuilder bb buf room
    return $ nextForBuilder len signal

fillFileBodyGetNext
    :: PositionRead -> FileOffset -> ByteCount -> Sentinel -> DynaNext
fillFileBodyGetNext pread start bytecount sentinel buf room = do
    len <- pread start (mini room bytecount) buf
    let len' = fromIntegral len
    nextForFile len' pread (start + len) (bytecount - len) sentinel

fillStreamBodyGetNext :: IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext takeQ = loop 0
  where
    loop :: NextWithTotal
    loop total buf room = do
        mChunk <- takeQ
        case mChunk of
            Just chunk -> runStreamingChunk chunk loop total buf room
            Nothing -> return $ Next total False (Just $ loop 0)

----------------------------------------------------------------

fillBufBuilderOne :: Int -> B.BufferWriter -> DynaNext
fillBufBuilderOne minReq writer buf0 room = do
    if room >= minReq
        then do
            (len, signal) <- writer buf0 room
            return $ nextForBuilder len signal
        else do
            return $ Next 0 True (Just $ fillBufBuilderOne minReq writer)

fillBufBuilderTwo :: ByteString -> B.BufferWriter -> DynaNext
fillBufBuilderTwo bs writer buf0 room
    | BS.length bs <= room = do
        buf1 <- copy buf0 bs
        let len1 = BS.length bs
        (len2, signal) <- writer buf1 (room - len1)
        return $ nextForBuilder (len1 + len2) signal
    | otherwise = do
        let (bs1, bs2) = BS.splitAt room bs
        void $ copy buf0 bs1
        return $ nextForBuilder room (B.Chunk bs2 writer)

nextForBuilder :: BytesFilled -> B.Next -> Next
nextForBuilder len B.Done =
    Next len True Nothing -- let's flush
nextForBuilder len (B.More minReq writer) =
    Next len False $ Just (fillBufBuilderOne minReq writer)
nextForBuilder len (B.Chunk bs writer) =
    Next len False $ Just (fillBufBuilderTwo bs writer)

----------------------------------------------------------------

-- | Like 'DynaNext', but with additional argument indicating total bytes written
--
-- Since @http2@ uses @DynaNext@ to construct a /single/ @DATA@ frame, the
-- \"total number of bytes written\" refers to the current size of the payload
-- of that @DATA@ frame.
type NextWithTotal = Int -> DynaNext

-- | Run the chunk, then continue as specified, unless streaming is finished
runStreamingChunk :: StreamingChunk -> NextWithTotal -> NextWithTotal
runStreamingChunk chunk next =
    case chunk of
        StreamingFinished mdec -> finished mdec
        StreamingCancelled mErr -> cancel mErr
        StreamingFlush -> flush
        StreamingBuilder builder NotEndOfStream -> runStreamingBuilder builder next
        StreamingBuilder builder (EndOfStream mdec) -> runStreamingBuilder builder (finished mdec)
  where
    finished :: Maybe CleanupStream -> NextWithTotal
    finished mdec = \total _buf _room -> do
        fromMaybe (return ()) mdec
        return $ Next total True Nothing

    flush :: NextWithTotal
    flush = \total _buf _room -> do
        return $ Next total True (Just $ next 0)

    -- Cancel streaming
    --
    -- The @_total@ number of bytes written refers to the @DATA@ frame currently
    -- under construction, but not yet sent (see discussion at 'DynaNext' and
    -- 'NextWithTotal'). Moreover, the documentation of 'outBodyCancel'
    -- explicitly states that such a partially constructed frame, if one exists,
    -- will be discarded on cancellation. We can therefore simply ignore
    -- @_total@ here.
    cancel :: Maybe SomeException -> NextWithTotal
    cancel mErr = \_total _buf _room -> pure $ CancelNext mErr

-- | Run 'Builder' until completion, then continue as specified
runStreamingBuilder :: Builder -> NextWithTotal -> NextWithTotal
runStreamingBuilder builder next = \total buf room -> do
    writeResult <- B.runBuilder builder buf room
    ranWriter writeResult total buf room
  where
    ranWriter :: (Int, B.Next) -> NextWithTotal
    ranWriter (len, signal) = \total buf room -> do
        let total' = total + len
        case signal of
            B.Done ->
                next total' (buf `plusPtr` len) (room - len)
            B.More minReq writer ->
                return $ Next total' False (Just $ goMore (Just minReq) writer 0)
            B.Chunk bs writer ->
                return $ Next total' False (Just $ goChunk bs writer 0)

    goMore :: Maybe Int -> B.BufferWriter -> NextWithTotal
    goMore mMinReq writer = \total buf room -> do
        let enoughRoom = maybe True (room >=) mMinReq
        if enoughRoom
            then do
                writeResult <- writer buf room
                ranWriter writeResult total buf room
            else do
                return $ Next total True (Just $ goMore mMinReq writer 0)

    goChunk :: ByteString -> B.BufferWriter -> NextWithTotal
    goChunk bs writer = \total buf room ->
        if BS.length bs <= room
            then do
                buf' <- copy buf bs
                let len = BS.length bs
                goMore Nothing writer (total + len) buf' (room - len)
            else do
                let (bs1, bs2) = BS.splitAt room bs
                void $ copy buf bs1
                return $ Next (total + room) False (Just $ goChunk bs2 writer 0)

----------------------------------------------------------------

fillBufFile :: PositionRead -> FileOffset -> ByteCount -> Sentinel -> DynaNext
fillBufFile pread start bytes sentinel buf room = do
    len <- pread start (mini room bytes) buf
    case sentinel of
        Refresher refresh -> refresh
        _ -> return ()
    let len' = fromIntegral len
    nextForFile len' pread (start + len) (bytes - len) sentinel

nextForFile
    :: BytesFilled -> PositionRead -> FileOffset -> ByteCount -> Sentinel -> IO Next
nextForFile 0 _ _ _ _ = return $ Next 0 True Nothing -- let's flush
nextForFile len _ _ 0 sentinel = do
    case sentinel of
        Closer close -> close
        _ -> return ()
    return $ Next len False Nothing
nextForFile len pread start bytes refresh =
    return $ Next len False $ Just $ fillBufFile pread start bytes refresh

{-# INLINE mini #-}
mini :: Int -> Int64 -> Int64
mini i n
    | fromIntegral i < n = fromIntegral i
    | otherwise = n