1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.StreamTable (
-- * Types
OddStreamTable (..),
emptyOddStreamTable,
EvenStreamTable (..),
emptyEvenStreamTable,
-- * Odd
insertOdd,
insertOdd',
deleteOdd,
lookupOdd,
getOddConcurrency,
getOddStreams,
clearOddStreamTable,
waitIncOdd,
-- * Even
insertEven,
insertEven',
deleteEven,
lookupEven,
getEvenConcurrency,
clearEvenStreamTable,
waitIncEven,
insertEvenCache,
deleteEvenCache,
lookupEvenCache,
getEvenStreams,
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IntMap
import Network.Control (LRUCache)
import qualified Network.Control as LRUCache
import Imports
import Network.HTTP2.H2.Types (Stream (..))
----------------------------------------------------------------
data OddStreamTable = OddStreamTable
{ oddConc :: Int
, oddTable :: IntMap Stream
}
emptyOddStreamTable :: OddStreamTable
emptyOddStreamTable = OddStreamTable 0 IntMap.empty
data EvenStreamTable = EvenStreamTable
{ evenConc :: Int
, evenTable :: IntMap Stream
, -- Cache must contain Stream instead of StreamId because
-- a Stream is deleted when end-of-stream is received.
-- After that, cache is looked up.
-- LRUCache is not used as LRU but as fixed-size map.
evenCache :: LRUCache (Method, ByteString) Stream
}
emptyEvenStreamTable :: Int -> EvenStreamTable
emptyEvenStreamTable lim = EvenStreamTable 0 IntMap.empty $ LRUCache.empty lim
----------------------------------------------------------------
insertOdd :: TVar OddStreamTable -> IntMap.Key -> Stream -> IO ()
insertOdd var k v = atomically $ modifyTVar var $ \OddStreamTable{..} ->
let oddConc' = oddConc + 1
oddTable' = IntMap.insert k v oddTable
in OddStreamTable oddConc' oddTable'
insertOdd' :: TVar OddStreamTable -> IntMap.Key -> Stream -> IO ()
insertOdd' var k v = atomically $ modifyTVar var $ \OddStreamTable{..} ->
let oddTable' = IntMap.insert k v oddTable
in OddStreamTable oddConc oddTable'
deleteOdd :: TVar OddStreamTable -> IntMap.Key -> SomeException -> IO ()
deleteOdd var k err = do
mv <- atomically deleteStream
case mv of
Nothing -> return () -- Stream was already removed
Just v -> void . tryPutMVar (streamInput v) $ Left err
where
deleteStream :: STM (Maybe Stream)
deleteStream = do
OddStreamTable{..} <- readTVar var
let oddConc' = oddConc - 1
oddTable' = IntMap.delete k oddTable
writeTVar var $ OddStreamTable oddConc' oddTable'
return $ IntMap.lookup k oddTable
lookupOdd :: TVar OddStreamTable -> IntMap.Key -> IO (Maybe Stream)
lookupOdd var k = IntMap.lookup k . oddTable <$> readTVarIO var
getOddConcurrency :: TVar OddStreamTable -> IO Int
getOddConcurrency var = oddConc <$> readTVarIO var
getOddStreams :: TVar OddStreamTable -> IO (IntMap Stream)
getOddStreams var = oddTable <$> readTVarIO var
clearOddStreamTable :: TVar OddStreamTable -> IO (IntMap Stream)
clearOddStreamTable var = atomically $ do
OddStreamTable{..} <- readTVar var
writeTVar var emptyOddStreamTable
return oddTable
waitIncOdd :: TVar OddStreamTable -> Int -> STM ()
waitIncOdd var maxConc = do
OddStreamTable{..} <- readTVar var
check (oddConc < maxConc)
let oddConc' = oddConc + 1
writeTVar var $ OddStreamTable oddConc' oddTable
----------------------------------------------------------------
insertEven :: TVar EvenStreamTable -> IntMap.Key -> Stream -> IO ()
insertEven var k v = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
let evenConc' = evenConc + 1
evenTable' = IntMap.insert k v evenTable
in EvenStreamTable evenConc' evenTable' evenCache
insertEven' :: TVar EvenStreamTable -> IntMap.Key -> Stream -> IO ()
insertEven' var k v = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
let evenTable' = IntMap.insert k v evenTable
in EvenStreamTable evenConc evenTable' evenCache
deleteEven :: TVar EvenStreamTable -> IntMap.Key -> SomeException -> IO ()
deleteEven var k err = do
mv <- atomically deleteStream
case mv of
Nothing -> return () -- Stream was already removed
Just v -> void . tryPutMVar (streamInput v) $ Left err
where
deleteStream :: STM (Maybe Stream)
deleteStream = do
EvenStreamTable{..} <- readTVar var
let evenConc' = evenConc - 1
evenTable' = IntMap.delete k evenTable
writeTVar var $ EvenStreamTable evenConc' evenTable' evenCache
return $ IntMap.lookup k evenTable
lookupEven :: TVar EvenStreamTable -> IntMap.Key -> IO (Maybe Stream)
lookupEven var k = IntMap.lookup k . evenTable <$> readTVarIO var
getEvenConcurrency :: TVar EvenStreamTable -> IO Int
getEvenConcurrency var = evenConc <$> readTVarIO var
clearEvenStreamTable :: TVar EvenStreamTable -> IO (IntMap Stream)
clearEvenStreamTable var = atomically $ do
EvenStreamTable{..} <- readTVar var
writeTVar var $ emptyEvenStreamTable 0
return evenTable
waitIncEven :: TVar EvenStreamTable -> Int -> STM ()
waitIncEven var maxConc = do
EvenStreamTable{..} <- readTVar var
check (evenConc < maxConc)
let evenConc' = evenConc + 1
writeTVar var $ EvenStreamTable evenConc' evenTable evenCache
insertEvenCache
:: TVar EvenStreamTable -> Method -> ByteString -> Stream -> IO ()
insertEvenCache var method path strm@Stream{..} = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
let evenConc' = evenConc + 1
evenTable' = IntMap.insert streamNumber strm evenTable
evenCache' = LRUCache.insert (method, path) strm evenCache
in EvenStreamTable evenConc' evenTable' evenCache'
deleteEvenCache :: TVar EvenStreamTable -> Method -> ByteString -> IO ()
deleteEvenCache var m path = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
let evenCache' = LRUCache.delete (m, path) evenCache
in EvenStreamTable evenConc evenTable evenCache'
lookupEvenCache
:: TVar EvenStreamTable -> Method -> ByteString -> IO (Maybe Stream)
lookupEvenCache var m path = LRUCache.lookup (m, path) . evenCache <$> readTVarIO var
getEvenStreams :: TVar EvenStreamTable -> IO (IntMap Stream)
getEvenStreams var = evenTable <$> readTVarIO var
|