1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- | A thread manager.
-- The manager has responsibility to spawn and kill
-- worker threads.
module Network.HTTP2.H2.Manager (
Manager,
Action,
start,
setAction,
stopAfter,
spawnAction,
forkManaged,
forkManagedUnmask,
timeoutKillThread,
timeoutClose,
KilledByHttp2ThreadManager (..),
incCounter,
decCounter,
waitCounter0,
) where
import Control.Exception
import Data.Foldable
import Data.IORef
import Data.Set (Set)
import qualified Data.Set as Set
import qualified System.TimeManager as T
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
import UnliftIO.STM
import Imports
----------------------------------------------------------------
-- | Action to be spawned by the manager.
type Action = IO ()
noAction :: Action
noAction = return ()
data Command = Stop (Maybe SomeException) | Spawn | Add ThreadId | Delete ThreadId
-- | Manager to manage the thread and the timer.
data Manager = Manager (TQueue Command) (IORef Action) (TVar Int) T.Manager
-- | Starting a thread manager.
-- Its action is initially set to 'return ()' and should be set
-- by 'setAction'. This allows that the action can include
-- the manager itself.
start :: T.Manager -> IO Manager
start timmgr = do
q <- newTQueueIO
ref <- newIORef noAction
cnt <- newTVarIO 0
void $ forkIO $ go q Set.empty ref
return $ Manager q ref cnt timmgr
where
go q tset0 ref = do
x <- atomically $ readTQueue q
case x of
Stop err -> kill tset0 err
Spawn -> next tset0
Add newtid ->
let tset = add newtid tset0
in go q tset ref
Delete oldtid ->
let tset = del oldtid tset0
in go q tset ref
where
next tset = do
action <- readIORef ref
newtid <- forkFinally action $ \_ -> do
mytid <- myThreadId
atomically $ writeTQueue q $ Delete mytid
let tset' = add newtid tset
go q tset' ref
-- | Setting the action to be spawned.
setAction :: Manager -> Action -> IO ()
setAction (Manager _ ref _ _) action = writeIORef ref action
-- | Stopping the manager.
stopAfter :: Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter (Manager q _ _ _) action cleanup = do
mask $ \unmask -> do
ma <- try $ unmask action
atomically $ writeTQueue q $ Stop (either Just (const Nothing) ma)
cleanup ma
-- | Spawning the action.
spawnAction :: Manager -> IO ()
spawnAction (Manager q _ _ _) = atomically $ writeTQueue q Spawn
----------------------------------------------------------------
-- | Fork managed thread
--
-- This guarantees that the thread ID is added to the manager's queue before
-- the thread starts, and is removed again when the thread terminates
-- (normally or abnormally).
forkManaged :: Manager -> IO () -> IO ()
forkManaged mgr io =
forkManagedUnmask mgr $ \unmask -> unmask io
-- | Like 'forkManaged', but run action with exceptions masked
forkManagedUnmask :: Manager -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask mgr io =
void $ mask_ $ forkIOWithUnmask $ \unmask -> do
addMyId mgr
-- We catch the exception and do not rethrow it: we don't want the
-- exception printed to stderr.
io unmask `catch` \(_e :: SomeException) -> return ()
deleteMyId mgr
-- | Adding my thread id to the kill-thread list on stopping.
--
-- This is not part of the public API; see 'forkManaged' instead.
addMyId :: Manager -> IO ()
addMyId (Manager q _ _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Add tid
-- | Deleting my thread id from the kill-thread list on stopping.
--
-- This is /only/ necessary when you want to remove the thread's ID from
-- the manager /before/ the thread terminates (thereby assuming responsibility
-- for thread cleanup yourself).
deleteMyId :: Manager -> IO ()
deleteMyId (Manager q _ _ _) = do
tid <- myThreadId
atomically $ writeTQueue q $ Delete tid
----------------------------------------------------------------
add :: ThreadId -> Set ThreadId -> Set ThreadId
add tid set = set'
where
set' = Set.insert tid set
del :: ThreadId -> Set ThreadId -> Set ThreadId
del tid set = set'
where
set' = Set.delete tid set
kill :: Set ThreadId -> Maybe SomeException -> IO ()
kill set err = traverse_ (\tid -> E.throwTo tid $ KilledByHttp2ThreadManager err) set
-- | Killing the IO action of the second argument on timeout.
timeoutKillThread :: Manager -> (T.Handle -> IO a) -> IO a
timeoutKillThread (Manager _ _ _ tmgr) action = E.bracket register T.cancel action
where
register = T.registerKillThread tmgr noAction
-- | Registering closer for a resource and
-- returning a timer refresher.
timeoutClose :: Manager -> IO () -> IO (IO ())
timeoutClose (Manager _ _ _ tmgr) closer = do
th <- T.register tmgr closer
return $ T.tickle th
data KilledByHttp2ThreadManager = KilledByHttp2ThreadManager (Maybe SomeException)
deriving (Show)
instance Exception KilledByHttp2ThreadManager where
toException = asyncExceptionToException
fromException = asyncExceptionFromException
----------------------------------------------------------------
incCounter :: Manager -> IO ()
incCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (+ 1)
decCounter :: Manager -> IO ()
decCounter (Manager _ _ cnt _) = atomically $ modifyTVar' cnt (subtract 1)
waitCounter0 :: Manager -> IO ()
waitCounter0 (Manager _ _ cnt _) = atomically $ do
n <- readTVar cnt
checkSTM (n < 1)
|