File: Sender.hs

package info (click to toggle)
haskell-http2 5.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 55,180 kB
  • sloc: haskell: 8,657; makefile: 5
file content (586 lines) | stat: -rw-r--r-- 23,624 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Network.HTTP2.H2.Sender (
    frameSender,
    fillBuilderBodyGetNext,
    fillFileBodyGetNext,
    fillStreamBodyGetNext,
    runTrailersMaker,
) where

import Control.Concurrent.MVar (putMVar)
import qualified Data.ByteString as BS
import Data.ByteString.Builder (Builder)
import qualified Data.ByteString.Builder.Extra as B
import Data.IORef (modifyIORef', readIORef, writeIORef)
import Data.IntMap.Strict (IntMap)
import Foreign.Ptr (minusPtr, plusPtr)
import Network.ByteOrder
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports
import Network.HPACK (TokenHeaderList, setLimitForEncoding, toHeaderTable)
import Network.HTTP2.Frame
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.EncodeFrame
import Network.HTTP2.H2.File
import Network.HTTP2.H2.HPACK
import Network.HTTP2.H2.Manager hiding (start)
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Settings
import Network.HTTP2.H2.Stream
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window

----------------------------------------------------------------

data Leftover
    = LZero
    | LOne B.BufferWriter
    | LTwo ByteString B.BufferWriter

----------------------------------------------------------------

{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming tbq = atomically $ do
    isEmpty <- isEmptyTBQueue tbq
    checkSTM (not isEmpty)

data Switch
    = C Control
    | O (Output Stream)
    | Flush

wrapException :: E.SomeException -> IO ()
wrapException se
    | Just (e :: HTTP2Error) <- E.fromException se = E.throwIO e
    | otherwise = E.throwIO $ BadThingHappen se

-- Peer SETTINGS_INITIAL_WINDOW_SIZE
-- Adjusting initial window size for streams
updatePeerSettings :: Context -> SettingsList -> IO ()
updatePeerSettings Context{peerSettings, oddStreamTable, evenStreamTable} peerAlist = do
    oldws <- initialWindowSize <$> readIORef peerSettings
    modifyIORef' peerSettings $ \old -> fromSettingsList old peerAlist
    newws <- initialWindowSize <$> readIORef peerSettings
    -- FIXME: race condition
    -- 1) newOddStream reads old peerSettings and
    --    insert it to its stream table after adjusting.
    -- 2) newOddStream reads new peerSettings and
    --    insert it to its stream table before adjusting.
    let dif = newws - oldws
    when (dif /= 0) $ do
        getOddStreams oddStreamTable >>= updateAllStreamTxFlow dif
        getEvenStreams evenStreamTable >>= updateAllStreamTxFlow dif
  where
    updateAllStreamTxFlow :: WindowSize -> IntMap Stream -> IO ()
    updateAllStreamTxFlow siz strms =
        forM_ strms $ \strm -> increaseStreamWindowSize strm siz

frameSender :: Context -> Config -> Manager -> IO ()
frameSender
    ctx@Context{outputQ, controlQ, encodeDynamicTable, outputBufferLimit}
    Config{..}
    mgr = loop 0 `E.catch` wrapException
      where
        ----------------------------------------------------------------
        loop :: Offset -> IO ()
        loop off = do
            x <- atomically $ dequeue off
            case x of
                C ctl -> flushN off >> control ctl >> loop 0
                O out -> outputOrEnqueueAgain out off >>= flushIfNecessary >>= loop
                Flush -> flushN off >> loop 0

        -- Flush the connection buffer to the socket, where the first 'n' bytes of
        -- the buffer are filled.
        flushN :: Offset -> IO ()
        flushN 0 = return ()
        flushN n = bufferIO confWriteBuffer n confSendAll

        flushIfNecessary :: Offset -> IO Offset
        flushIfNecessary off = do
            buflim <- readIORef outputBufferLimit
            if off <= buflim - 512
                then return off
                else do
                    flushN off
                    return 0

        dequeue :: Offset -> STM Switch
        dequeue off = do
            isEmptyC <- isEmptyTQueue controlQ
            if isEmptyC
                then do
                    -- FLOW CONTROL: WINDOW_UPDATE 0: send: respecting peer's limit
                    waitConnectionWindowSize ctx
                    isEmptyO <- isEmptyTQueue outputQ
                    if isEmptyO
                        then if off /= 0 then return Flush else retrySTM
                        else O <$> readTQueue outputQ
                else C <$> readTQueue controlQ

        ----------------------------------------------------------------
        copyAll [] buf = return buf
        copyAll (x : xs) buf = copy buf x >>= copyAll xs

        -- called with off == 0
        control :: Control -> IO ()
        control (CFinish e) = E.throwIO e
        control (CGoaway bs mvar) = do
            buf <- copyAll [bs] confWriteBuffer
            let off = buf `minusPtr` confWriteBuffer
            flushN off
            putMVar mvar ()
            E.throwIO GoAwayIsSent
        control (CFrames ms xs) = do
            buf <- copyAll xs confWriteBuffer
            let off = buf `minusPtr` confWriteBuffer
            flushN off
            case ms of
                Nothing -> return ()
                Just peerAlist -> do
                    -- Peer SETTINGS_INITIAL_WINDOW_SIZE
                    updatePeerSettings ctx peerAlist
                    -- Peer SETTINGS_MAX_FRAME_SIZE
                    case lookup SettingsMaxFrameSize peerAlist of
                        Nothing -> return ()
                        Just payloadLen -> do
                            let dlim = payloadLen + frameHeaderLength
                                buflim
                                    | confBufferSize >= dlim = dlim
                                    | otherwise = confBufferSize
                            writeIORef outputBufferLimit buflim
                    -- Peer SETTINGS_HEADER_TABLE_SIZE
                    case lookup SettingsHeaderTableSize peerAlist of
                        Nothing -> return ()
                        Just siz -> setLimitForEncoding siz encodeDynamicTable

        ----------------------------------------------------------------
        output :: Output Stream -> Offset -> WindowSize -> IO Offset
        output out@(Output strm OutObj{} (ONext curr tlrmkr) _ sentinel) off0 lim = do
            -- Data frame payload
            buflim <- readIORef outputBufferLimit
            let payloadOff = off0 + frameHeaderLength
                datBuf = confWriteBuffer `plusPtr` payloadOff
                datBufSiz = buflim - payloadOff
            Next datPayloadLen reqflush mnext <- curr datBuf datBufSiz lim -- checkme
            NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
            fillDataHeaderEnqueueNext
                strm
                off0
                datPayloadLen
                mnext
                tlrmkr'
                sentinel
                out
                reqflush
        output out@(Output strm (OutObj hdr body tlrmkr) OObj mtbq _) off0 lim = do
            -- Header frame and Continuation frame
            let sid = streamNumber strm
                endOfStream = case body of
                    OutBodyNone -> True
                    _ -> False
            (ths, _) <- toHeaderTable $ fixHeaders hdr
            off' <- headerContinue sid ths endOfStream off0
            -- halfClosedLocal calls closed which removes
            -- the stream from stream table.
            when endOfStream $ halfClosedLocal ctx strm Finished
            off <- flushIfNecessary off'
            case body of
                OutBodyNone -> return off
                OutBodyFile (FileSpec path fileoff bytecount) -> do
                    (pread, sentinel') <- confPositionReadMaker path
                    refresh <- case sentinel' of
                        Closer closer -> timeoutClose mgr closer
                        Refresher refresher -> return refresher
                    let next = fillFileBodyGetNext pread fileoff bytecount refresh
                        out' = out{outputType = ONext next tlrmkr}
                    output out' off lim
                OutBodyBuilder builder -> do
                    let next = fillBuilderBodyGetNext builder
                        out' = out{outputType = ONext next tlrmkr}
                    output out' off lim
                OutBodyStreaming _ ->
                    output (setNextForStreaming mtbq tlrmkr out) off lim
                OutBodyStreamingUnmask _ ->
                    output (setNextForStreaming mtbq tlrmkr out) off lim
        output out@(Output strm _ (OPush ths pid) _ _) off0 lim = do
            -- Creating a push promise header
            -- Frame id should be associated stream id from the client.
            let sid = streamNumber strm
            len <- pushPromise pid sid ths off0
            off <- flushIfNecessary $ off0 + frameHeaderLength + len
            output out{outputType = OObj} off lim
        output _ _ _ = undefined -- never reach

        ----------------------------------------------------------------
        setNextForStreaming
            :: Maybe (TBQueue StreamingChunk)
            -> TrailersMaker
            -> Output Stream
            -> Output Stream
        setNextForStreaming mtbq tlrmkr out =
            let tbq = fromJust mtbq
                takeQ = atomically $ tryReadTBQueue tbq
                next = fillStreamBodyGetNext takeQ
             in out{outputType = ONext next tlrmkr}

        ----------------------------------------------------------------
        outputOrEnqueueAgain :: Output Stream -> Offset -> IO Offset
        outputOrEnqueueAgain out@(Output strm _ otyp _ _) off = E.handle resetStream $ do
            state <- readStreamState strm
            if isHalfClosedLocal state
                then return off
                else case otyp of
                    OWait wait -> do
                        -- Checking if all push are done.
                        forkAndEnqueueWhenReady wait outputQ out{outputType = OObj} mgr
                        return off
                    _ -> case mtbq of
                        Just tbq -> checkStreaming tbq
                        _ -> checkStreamWindowSize
          where
            mtbq = outputStrmQ out
            checkStreaming tbq = do
                isEmpty <- atomically $ isEmptyTBQueue tbq
                if isEmpty
                    then do
                        forkAndEnqueueWhenReady (waitStreaming tbq) outputQ out mgr
                        return off
                    else checkStreamWindowSize
            -- FLOW CONTROL: WINDOW_UPDATE: send: respecting peer's limit
            checkStreamWindowSize = do
                sws <- getStreamWindowSize strm
                if sws == 0
                    then do
                        forkAndEnqueueWhenReady (waitStreamWindowSize strm) outputQ out mgr
                        return off
                    else do
                        cws <- getConnectionWindowSize ctx -- not 0
                        let lim = min cws sws
                        output out off lim
            resetStream e = do
                closed ctx strm (ResetByMe e)
                let rst = resetFrame InternalError $ streamNumber strm
                enqueueControl controlQ $ CFrames Nothing [rst]
                return off

        ----------------------------------------------------------------
        headerContinue :: StreamId -> TokenHeaderList -> Bool -> Offset -> IO Offset
        headerContinue sid ths0 endOfStream off0 = do
            buflim <- readIORef outputBufferLimit
            let offkv = off0 + frameHeaderLength
                bufkv = confWriteBuffer `plusPtr` offkv
                limkv = buflim - offkv
            (ths, kvlen) <- hpackEncodeHeader ctx bufkv limkv ths0
            if kvlen == 0
                then continue off0 ths FrameHeaders
                else do
                    let flag = getFlag ths
                        buf = confWriteBuffer `plusPtr` off0
                        off = offkv + kvlen
                    fillFrameHeader FrameHeaders kvlen sid flag buf
                    continue off ths FrameContinuation
          where
            eos = if endOfStream then setEndStream else id
            getFlag [] = eos $ setEndHeader defaultFlags
            getFlag _ = eos $ defaultFlags

            continue :: Offset -> TokenHeaderList -> FrameType -> IO Offset
            continue off [] _ = return off
            continue off ths ft = do
                flushN off
                -- Now off is 0
                buflim <- readIORef outputBufferLimit
                let bufHeaderPayload = confWriteBuffer `plusPtr` frameHeaderLength

                    headerPayloadLim = buflim - frameHeaderLength
                (ths', kvlen') <-
                    hpackEncodeHeaderLoop ctx bufHeaderPayload headerPayloadLim ths
                when (ths == ths') $
                    E.throwIO $
                        ConnectionErrorIsSent CompressionError sid "cannot compress the header"
                let flag = getFlag ths'
                    off' = frameHeaderLength + kvlen'
                fillFrameHeader ft kvlen' sid flag confWriteBuffer
                continue off' ths' FrameContinuation

        ----------------------------------------------------------------
        fillDataHeaderEnqueueNext
            :: Stream
            -> Offset
            -> Int
            -> Maybe DynaNext
            -> (Maybe ByteString -> IO NextTrailersMaker)
            -> IO ()
            -> Output Stream
            -> Bool
            -> IO Offset
        fillDataHeaderEnqueueNext
            strm@Stream{streamNumber}
            off
            datPayloadLen
            Nothing
            tlrmkr
            tell
            _
            reqflush = do
                let buf = confWriteBuffer `plusPtr` off
                    off' = off + frameHeaderLength + datPayloadLen
                (mtrailers, flag) <- do
                    Trailers trailers <- tlrmkr Nothing
                    if null trailers
                        then return (Nothing, setEndStream defaultFlags)
                        else return (Just trailers, defaultFlags)
                fillFrameHeader FrameData datPayloadLen streamNumber flag buf
                off'' <- handleTrailers mtrailers off'
                void tell
                halfClosedLocal ctx strm Finished
                decreaseWindowSize ctx strm datPayloadLen
                if reqflush
                    then do
                        flushN off''
                        return 0
                    else return off''
              where
                handleTrailers Nothing off0 = return off0
                handleTrailers (Just trailers) off0 = do
                    (ths, _) <- toHeaderTable trailers
                    headerContinue streamNumber ths True {- endOfStream -} off0
        fillDataHeaderEnqueueNext
            _
            off
            0
            (Just next)
            tlrmkr
            _
            out
            reqflush = do
                let out' = out{outputType = ONext next tlrmkr}
                enqueueOutput outputQ out'
                if reqflush
                    then do
                        flushN off
                        return 0
                    else return off
        fillDataHeaderEnqueueNext
            strm@Stream{streamNumber}
            off
            datPayloadLen
            (Just next)
            tlrmkr
            _
            out
            reqflush = do
                let buf = confWriteBuffer `plusPtr` off
                    off' = off + frameHeaderLength + datPayloadLen
                    flag = defaultFlags
                fillFrameHeader FrameData datPayloadLen streamNumber flag buf
                decreaseWindowSize ctx strm datPayloadLen
                let out' = out{outputType = ONext next tlrmkr}
                enqueueOutput outputQ out'
                if reqflush
                    then do
                        flushN off'
                        return 0
                    else return off'

        ----------------------------------------------------------------
        pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int
        pushPromise pid sid ths off = do
            let offsid = off + frameHeaderLength -- checkme
                bufsid = confWriteBuffer `plusPtr` offsid
            poke32 (fromIntegral sid) bufsid 0
            let offkv = offsid + 4
                bufkv = confWriteBuffer `plusPtr` offkv
                limkv = confBufferSize - offkv
            (_, kvlen) <- hpackEncodeHeader ctx bufkv limkv ths
            let flag = setEndHeader defaultFlags -- No EndStream flag
                buf = confWriteBuffer `plusPtr` off
                len = kvlen + 4
            fillFrameHeader FramePushPromise len pid flag buf
            return len

        ----------------------------------------------------------------
        {-# INLINE fillFrameHeader #-}
        fillFrameHeader :: FrameType -> Int -> StreamId -> FrameFlags -> Buffer -> IO ()
        fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf
          where
            hinfo =
                FrameHeader
                    { payloadLength = len
                    , flags = flag
                    , streamId = sid
                    }

-- | Running trailers-maker.
--
-- > bufferIO buf siz $ \bs -> tlrmkr (Just bs)
runTrailersMaker :: TrailersMaker -> Buffer -> Int -> IO NextTrailersMaker
runTrailersMaker tlrmkr buf siz = bufferIO buf siz $ \bs -> tlrmkr (Just bs)

----------------------------------------------------------------

fillBuilderBodyGetNext :: Builder -> DynaNext
fillBuilderBodyGetNext bb buf siz lim = do
    let room = min siz lim
    (len, signal) <- B.runBuilder bb buf room
    return $ nextForBuilder len signal

fillFileBodyGetNext
    :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillFileBodyGetNext pread start bytecount refresh buf siz lim = do
    let room = min siz lim
    len <- pread start (mini room bytecount) buf
    let len' = fromIntegral len
    return $ nextForFile len' pread (start + len) (bytecount - len) refresh

fillStreamBodyGetNext :: IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext takeQ buf siz lim = do
    let room = min siz lim
    (cont, len, reqflush, leftover) <- runStreamBuilder buf room takeQ
    return $ nextForStream cont len reqflush leftover takeQ

----------------------------------------------------------------

fillBufBuilder :: Leftover -> DynaNext
fillBufBuilder leftover buf0 siz0 lim = do
    let room = min siz0 lim
    case leftover of
        LZero -> error "fillBufBuilder: LZero"
        LOne writer -> do
            (len, signal) <- writer buf0 room
            getNext len signal
        LTwo bs writer
            | BS.length bs <= room -> do
                buf1 <- copy buf0 bs
                let len1 = BS.length bs
                (len2, signal) <- writer buf1 (room - len1)
                getNext (len1 + len2) signal
            | otherwise -> do
                let (bs1, bs2) = BS.splitAt room bs
                void $ copy buf0 bs1
                getNext room (B.Chunk bs2 writer)
  where
    getNext l s = return $ nextForBuilder l s

nextForBuilder :: BytesFilled -> B.Next -> Next
nextForBuilder len B.Done =
    Next len True Nothing -- let's flush
nextForBuilder len (B.More _ writer) =
    Next len False $ Just (fillBufBuilder (LOne writer))
nextForBuilder len (B.Chunk bs writer) =
    Next len False $ Just (fillBufBuilder (LTwo bs writer))

----------------------------------------------------------------

runStreamBuilder
    :: Buffer
    -> BufferSize
    -> IO (Maybe StreamingChunk)
    -> IO
        ( Bool -- continue
        , BytesFilled
        , Bool -- require flusing
        , Leftover
        )
runStreamBuilder buf0 room0 takeQ = loop buf0 room0 0
  where
    loop buf room total = do
        mbuilder <- takeQ
        case mbuilder of
            Nothing -> return (True, total, False, LZero)
            Just (StreamingBuilder builder) -> do
                (len, signal) <- B.runBuilder builder buf room
                let total' = total + len
                case signal of
                    B.Done -> loop (buf `plusPtr` len) (room - len) total'
                    B.More _ writer -> return (True, total', False, LOne writer)
                    B.Chunk bs writer -> return (True, total', False, LTwo bs writer)
            Just StreamingFlush -> return (True, total, True, LZero)
            Just (StreamingFinished dec) -> do
                dec
                return (False, total, True, LZero)

fillBufStream :: Leftover -> IO (Maybe StreamingChunk) -> DynaNext
fillBufStream leftover0 takeQ buf0 siz0 lim0 = do
    let room0 = min siz0 lim0
    case leftover0 of
        LZero -> do
            (cont, len, reqflush, leftover) <- runStreamBuilder buf0 room0 takeQ
            getNext cont len reqflush leftover
        LOne writer -> write writer buf0 room0 0
        LTwo bs writer
            | BS.length bs <= room0 -> do
                buf1 <- copy buf0 bs
                let len = BS.length bs
                write writer buf1 (room0 - len) len
            | otherwise -> do
                let (bs1, bs2) = BS.splitAt room0 bs
                void $ copy buf0 bs1
                getNext True room0 False $ LTwo bs2 writer
  where
    getNext :: Bool -> BytesFilled -> Bool -> Leftover -> IO Next
    getNext cont len reqflush l = return $ nextForStream cont len reqflush l takeQ

    write
        :: (Buffer -> BufferSize -> IO (Int, B.Next))
        -> Buffer
        -> BufferSize
        -> Int
        -> IO Next
    write writer1 buf room sofar = do
        (len, signal) <- writer1 buf room
        case signal of
            B.Done -> do
                (cont, extra, reqflush, leftover) <-
                    runStreamBuilder (buf `plusPtr` len) (room - len) takeQ
                let total = sofar + len + extra
                getNext cont total reqflush leftover
            B.More _ writer -> do
                let total = sofar + len
                getNext True total False $ LOne writer
            B.Chunk bs writer -> do
                let total = sofar + len
                getNext True total False $ LTwo bs writer

nextForStream
    :: Bool
    -> BytesFilled
    -> Bool
    -> Leftover
    -> IO (Maybe StreamingChunk)
    -> Next
nextForStream False len reqflush _ _ = Next len reqflush Nothing
nextForStream True len reqflush leftOrZero takeQ =
    Next len reqflush $ Just (fillBufStream leftOrZero takeQ)

----------------------------------------------------------------

fillBufFile :: PositionRead -> FileOffset -> ByteCount -> IO () -> DynaNext
fillBufFile pread start bytes refresh buf siz lim = do
    let room = min siz lim
    len <- pread start (mini room bytes) buf
    refresh
    let len' = fromIntegral len
    return $ nextForFile len' pread (start + len) (bytes - len) refresh

nextForFile
    :: BytesFilled -> PositionRead -> FileOffset -> ByteCount -> IO () -> Next
nextForFile 0 _ _ _ _ = Next 0 True Nothing -- let's flush
nextForFile len _ _ 0 _ = Next len False Nothing
nextForFile len pread start bytes refresh =
    Next len False $ Just $ fillBufFile pread start bytes refresh

{-# INLINE mini #-}
mini :: Int -> Int64 -> Int64
mini i n
    | fromIntegral i < n = fromIntegral i
    | otherwise = n