File: Stream.hs

package info (click to toggle)
haskell-http2 5.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 55,120 kB
  • sloc: haskell: 7,911; makefile: 3
file content (174 lines) | stat: -rw-r--r-- 5,285 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}

module Network.HTTP2.H2.Stream where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.IORef
import Data.Maybe (fromMaybe)
import Network.Control
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO

import Network.HTTP2.Frame
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types

----------------------------------------------------------------

isIdle :: StreamState -> Bool
isIdle Idle = True
isIdle _ = False

isOpen :: StreamState -> Bool
isOpen Open{} = True
isOpen _ = False

isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote HalfClosedRemote = True
isHalfClosedRemote (Closed _) = True
isHalfClosedRemote _ = False

isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal (Open (Just _) _) = True
isHalfClosedLocal (Closed _) = True
isHalfClosedLocal _ = False

isClosed :: StreamState -> Bool
isClosed Closed{} = True
isClosed _ = False

isReserved :: StreamState -> Bool
isReserved Reserved = True
isReserved _ = False

----------------------------------------------------------------

newOddStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newOddStream sid txwin rxwin =
    Stream sid
        <$> newIORef Idle
        <*> newEmptyMVar
        <*> newTVarIO (newTxFlow txwin)
        <*> newIORef (newRxFlow rxwin)
        <*> newIORef Nothing

newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream sid txwin rxwin =
    Stream sid
        <$> newIORef Reserved
        <*> newEmptyMVar
        <*> newTVarIO (newTxFlow txwin)
        <*> newIORef (newRxFlow rxwin)
        <*> newIORef Nothing

----------------------------------------------------------------

{-# INLINE readStreamState #-}
readStreamState :: Stream -> IO StreamState
readStreamState Stream{streamState} = readIORef streamState

----------------------------------------------------------------

closeAllStreams
    :: TVar OddStreamTable -> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams ovar evar mErr' = do
    ostrms <- clearOddStreamTable ovar
    mapM_ finalize ostrms
    estrms <- clearEvenStreamTable evar
    mapM_ finalize estrms
  where
    finalize strm = do
        st <- readStreamState strm
        void $ tryPutMVar (streamInput strm) err
        case st of
            Open _ (Body q _ _ _) ->
                atomically $ writeTQueue q $ maybe (Right (mempty, True)) Left mErr
            _otherwise ->
                return ()

    mErr :: Maybe SomeException
    mErr = case mErr' of
        Just e
            | Just ConnectionIsClosed <- fromException e ->
                Nothing
        _otherwise ->
            mErr'

    err :: Either SomeException a
    err = Left $ fromMaybe (toException ConnectionIsClosed) mErr

----------------------------------------------------------------

data StreamTerminated
    = StreamPushedFinal
    | StreamCancelled
    | StreamOutOfScope
    deriving (Show)
    deriving anyclass (Exception)

withOutBodyIface
    :: TBQueue StreamingChunk
    -> (forall a. IO a -> IO a)
    -> (OutBodyIface -> IO r)
    -> IO r
withOutBodyIface tbq unmask k = do
    terminated <- newTVarIO Nothing
    let whenNotTerminated act = do
            mTerminated <- readTVar terminated
            case mTerminated of
                Just reason ->
                    throwSTM reason
                Nothing ->
                    act

        terminateWith reason act = do
            mTerminated <- readTVar terminated
            case mTerminated of
                Just _ ->
                    -- Already terminated
                    return ()
                Nothing -> do
                    writeTVar terminated (Just reason)
                    act

        iface =
            OutBodyIface
                { outBodyUnmask = unmask
                , outBodyPush = \b ->
                    atomically $
                        whenNotTerminated $
                            writeTBQueue tbq $
                                StreamingBuilder b NotEndOfStream
                , outBodyPushFinal = \b ->
                    atomically $ whenNotTerminated $ do
                        writeTVar terminated (Just StreamPushedFinal)
                        writeTBQueue tbq $ StreamingBuilder b (EndOfStream Nothing)
                        writeTBQueue tbq $ StreamingFinished Nothing
                , outBodyFlush =
                    atomically $
                        whenNotTerminated $
                            writeTBQueue tbq StreamingFlush
                , outBodyCancel = \mErr ->
                    atomically $
                        terminateWith StreamCancelled $
                            writeTBQueue tbq (StreamingCancelled mErr)
                }
        finished = atomically $ do
            terminateWith StreamOutOfScope $
                writeTBQueue tbq $
                    StreamingFinished Nothing
    k iface `finally` finished

nextForStreaming
    :: TBQueue StreamingChunk
    -> DynaNext
nextForStreaming tbq =
    let takeQ = atomically $ tryReadTBQueue tbq
        next = fillStreamBodyGetNext takeQ
     in next