File: StreamTable.hs

package info (click to toggle)
haskell-http2 5.3.10-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 55,120 kB
  • sloc: haskell: 7,911; makefile: 3
file content (183 lines) | stat: -rw-r--r-- 6,417 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.StreamTable (
    -- * Types
    OddStreamTable (..),
    emptyOddStreamTable,
    EvenStreamTable (..),
    emptyEvenStreamTable,

    -- * Odd
    insertOdd,
    insertOdd',
    deleteOdd,
    lookupOdd,
    getOddConcurrency,
    getOddStreams,
    clearOddStreamTable,
    waitIncOdd,

    -- * Even
    insertEven,
    insertEven',
    deleteEven,
    lookupEven,
    getEvenConcurrency,
    clearEvenStreamTable,
    waitIncEven,
    insertEvenCache,
    deleteEvenCache,
    lookupEvenCache,
    getEvenStreams,
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Data.IntMap.Strict (IntMap)
import qualified Data.IntMap.Strict as IntMap
import Network.Control (LRUCache)
import qualified Network.Control as LRUCache

import Imports
import Network.HTTP2.H2.Types (Stream (..))

----------------------------------------------------------------

data OddStreamTable = OddStreamTable
    { oddConc :: Int
    , oddTable :: IntMap Stream
    }

emptyOddStreamTable :: OddStreamTable
emptyOddStreamTable = OddStreamTable 0 IntMap.empty

data EvenStreamTable = EvenStreamTable
    { evenConc :: Int
    , evenTable :: IntMap Stream
    , -- Cache must contain Stream instead of StreamId because
      -- a Stream is deleted when end-of-stream is received.
      -- After that, cache is looked up.
      -- LRUCache is not used as LRU but as fixed-size map.
      evenCache :: LRUCache (Method, ByteString) Stream
    }

emptyEvenStreamTable :: Int -> EvenStreamTable
emptyEvenStreamTable lim = EvenStreamTable 0 IntMap.empty $ LRUCache.empty lim

----------------------------------------------------------------

insertOdd :: TVar OddStreamTable -> IntMap.Key -> Stream -> IO ()
insertOdd var k v = atomically $ modifyTVar var $ \OddStreamTable{..} ->
    let oddConc' = oddConc + 1
        oddTable' = IntMap.insert k v oddTable
     in OddStreamTable oddConc' oddTable'

insertOdd' :: TVar OddStreamTable -> IntMap.Key -> Stream -> IO ()
insertOdd' var k v = atomically $ modifyTVar var $ \OddStreamTable{..} ->
    let oddTable' = IntMap.insert k v oddTable
     in OddStreamTable oddConc oddTable'

deleteOdd :: TVar OddStreamTable -> IntMap.Key -> SomeException -> IO ()
deleteOdd var k err = do
    mv <- atomically deleteStream
    case mv of
        Nothing -> return () -- Stream was already removed
        Just v -> void . tryPutMVar (streamInput v) $ Left err
  where
    deleteStream :: STM (Maybe Stream)
    deleteStream = do
        OddStreamTable{..} <- readTVar var
        let oddConc' = oddConc - 1
            oddTable' = IntMap.delete k oddTable
        writeTVar var $ OddStreamTable oddConc' oddTable'
        return $ IntMap.lookup k oddTable

lookupOdd :: TVar OddStreamTable -> IntMap.Key -> IO (Maybe Stream)
lookupOdd var k = IntMap.lookup k . oddTable <$> readTVarIO var

getOddConcurrency :: TVar OddStreamTable -> IO Int
getOddConcurrency var = oddConc <$> readTVarIO var

getOddStreams :: TVar OddStreamTable -> IO (IntMap Stream)
getOddStreams var = oddTable <$> readTVarIO var

clearOddStreamTable :: TVar OddStreamTable -> IO (IntMap Stream)
clearOddStreamTable var = atomically $ do
    OddStreamTable{..} <- readTVar var
    writeTVar var emptyOddStreamTable
    return oddTable

waitIncOdd :: TVar OddStreamTable -> Int -> STM ()
waitIncOdd var maxConc = do
    OddStreamTable{..} <- readTVar var
    check (oddConc < maxConc)
    let oddConc' = oddConc + 1
    writeTVar var $ OddStreamTable oddConc' oddTable

----------------------------------------------------------------

insertEven :: TVar EvenStreamTable -> IntMap.Key -> Stream -> IO ()
insertEven var k v = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
    let evenConc' = evenConc + 1
        evenTable' = IntMap.insert k v evenTable
     in EvenStreamTable evenConc' evenTable' evenCache

insertEven' :: TVar EvenStreamTable -> IntMap.Key -> Stream -> IO ()
insertEven' var k v = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
    let evenTable' = IntMap.insert k v evenTable
     in EvenStreamTable evenConc evenTable' evenCache

deleteEven :: TVar EvenStreamTable -> IntMap.Key -> SomeException -> IO ()
deleteEven var k err = do
    mv <- atomically deleteStream
    case mv of
        Nothing -> return () -- Stream was already removed
        Just v -> void . tryPutMVar (streamInput v) $ Left err
  where
    deleteStream :: STM (Maybe Stream)
    deleteStream = do
        EvenStreamTable{..} <- readTVar var
        let evenConc' = evenConc - 1
            evenTable' = IntMap.delete k evenTable
        writeTVar var $ EvenStreamTable evenConc' evenTable' evenCache
        return $ IntMap.lookup k evenTable

lookupEven :: TVar EvenStreamTable -> IntMap.Key -> IO (Maybe Stream)
lookupEven var k = IntMap.lookup k . evenTable <$> readTVarIO var

getEvenConcurrency :: TVar EvenStreamTable -> IO Int
getEvenConcurrency var = evenConc <$> readTVarIO var

clearEvenStreamTable :: TVar EvenStreamTable -> IO (IntMap Stream)
clearEvenStreamTable var = atomically $ do
    EvenStreamTable{..} <- readTVar var
    writeTVar var $ emptyEvenStreamTable 0
    return evenTable

waitIncEven :: TVar EvenStreamTable -> Int -> STM ()
waitIncEven var maxConc = do
    EvenStreamTable{..} <- readTVar var
    check (evenConc < maxConc)
    let evenConc' = evenConc + 1
    writeTVar var $ EvenStreamTable evenConc' evenTable evenCache

insertEvenCache
    :: TVar EvenStreamTable -> Method -> ByteString -> Stream -> IO ()
insertEvenCache var method path strm@Stream{..} = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
    let evenConc' = evenConc + 1
        evenTable' = IntMap.insert streamNumber strm evenTable
        evenCache' = LRUCache.insert (method, path) strm evenCache
     in EvenStreamTable evenConc' evenTable' evenCache'

deleteEvenCache :: TVar EvenStreamTable -> Method -> ByteString -> IO ()
deleteEvenCache var m path = atomically $ modifyTVar var $ \EvenStreamTable{..} ->
    let evenCache' = LRUCache.delete (m, path) evenCache
     in EvenStreamTable evenConc evenTable evenCache'

lookupEvenCache
    :: TVar EvenStreamTable -> Method -> ByteString -> IO (Maybe Stream)
lookupEvenCache var m path = LRUCache.lookup (m, path) . evenCache <$> readTVarIO var

getEvenStreams :: TVar EvenStreamTable -> IO (IntMap Stream)
getEvenStreams var = evenTable <$> readTVarIO var