File: ThreadManager.hs

package info (click to toggle)
haskell-time-manager 0.2.4-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 64 kB
  • sloc: haskell: 320; makefile: 2
file content (218 lines) | stat: -rw-r--r-- 7,272 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | A thread manager including a time manager.
--   The manager has responsibility to kill managed threads.
module System.ThreadManager (
    ThreadManager,
    newThreadManager,
    stopAfter,
    KilledByThreadManager (..),

    -- * Fork
    forkManaged,
    forkManagedFinally,
    forkManagedUnmask,
    forkManagedTimeout,
    forkManagedTimeoutFinally,

    -- * Synchronization
    waitUntilAllGone,
    isAllGone,

    -- * Re-exports
    T.Manager,
    withHandle,
    T.Handle,
    T.tickle,
    T.pause,
    T.resume,
) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (Exception (..), SomeException (..))
import qualified Control.Exception as E
import Control.Monad (unless, void)
import Data.Foldable (forM_)
import Data.IORef
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Word (Word64)
import GHC.Conc.Sync (labelThread)
#if __GLASGOW_HASKELL__ >= 908
import GHC.Conc.Sync (fromThreadId)
#endif
import System.Mem.Weak (Weak, deRefWeak)
import qualified System.TimeManager as T

----------------------------------------------------------------

-- | Manager to manage the thread and the timer.
data ThreadManager = ThreadManager T.Manager (TVar ManagedThreads)

type Key = Word64
type ManagedThreads = Map Key ManagedThread

----------------------------------------------------------------

-- 'IORef' prevents race between WAI TimeManager (TimeoutThread)
-- and stopAfter (KilledByThreadManager).
-- It is initialized with 'False' and turned into 'True' when locked.
-- The winner can throw an asynchronous exception.
data ManagedThread = ManagedThread (Weak ThreadId) (IORef Bool)

----------------------------------------------------------------

-- | Starting a thread manager.
--   Its action is initially set to 'return ()' and should be set
--   by 'setAction'. This allows that the action can include
--   the manager itself.
newThreadManager :: T.Manager -> IO ThreadManager
newThreadManager timmgr = ThreadManager timmgr <$> newTVarIO Map.empty

----------------------------------------------------------------

-- | An exception used internally to kill a managed thread.
data KilledByThreadManager = KilledByThreadManager (Maybe SomeException)
    deriving (Show)

instance Exception KilledByThreadManager where
    toException = E.asyncExceptionToException
    fromException = E.asyncExceptionFromException

-- | Stopping the manager.
--
-- The action is run in the scope of an exception handler that catches all
-- exceptions (including asynchronous ones); this allows the cleanup handler
-- to cleanup in all circumstances. If an exception is caught, it is rethrown
-- after the cleanup is complete.
stopAfter :: ThreadManager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter (ThreadManager _timmgr var) action cleanup = do
    E.mask $ \unmask -> do
        ma <- E.try $ unmask action
        m <- atomically $ do
            m0 <- readTVar var
            writeTVar var Map.empty
            return m0
        let ths = Map.elems m
            er = either Just (const Nothing) ma
            ex = KilledByThreadManager er
        forM_ ths $ \(ManagedThread wtid ref) -> lockAndKill wtid ref ex
        case ma of
            Left err -> cleanup (Just err) >> E.throwIO err
            Right a -> cleanup Nothing >> return a

----------------------------------------------------------------

-- | Fork a managed thread.
--
-- This guarantees that the thread ID is added to the manager's queue before
-- the thread starts, and is removed again when the thread terminates
-- (normally or abnormally).
forkManaged
    :: ThreadManager
    -> String
    -- ^ Thread name
    -> IO ()
    -- ^ Action
    -> IO ()
forkManaged mgr label io =
    forkManagedUnmask mgr label $ \unmask -> unmask io

-- | Like 'forkManaged', but run action with exceptions masked
forkManagedUnmask
    :: ThreadManager -> String -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask (ThreadManager _timmgr var) label io =
    void $ E.mask_ $ forkIOWithUnmask $ \unmask -> E.handle ignore $ do
        labelMe label
        E.bracket (setup var) (clear var) $ \_ -> io unmask

-- | Fork a managed thread with a handle created by a timeout manager.
forkManagedTimeout :: ThreadManager -> String -> (T.Handle -> IO ()) -> IO ()
forkManagedTimeout (ThreadManager timmgr var) label io =
    void $ forkIO $ E.handle ignore $ do
        labelMe label
        E.bracket (setup var) (clear var) $ \(_n, wtid, ref) ->
            -- 'TimeoutThread' is ignored by 'withHandle'.
            void $ T.withHandle timmgr (lockAndKill wtid ref T.TimeoutThread) io

-- | Fork a managed thread with a cleanup function.
forkManagedFinally :: ThreadManager -> String -> IO () -> IO () -> IO ()
forkManagedFinally mgr label io final = E.mask $ \restore ->
    forkManaged
        mgr
        label
        (E.try (restore io) >>= \(_ :: Either E.SomeException ()) -> final)

-- | Fork a managed thread with a handle created by a timeout manager
-- and with a cleanup function.
forkManagedTimeoutFinally
    :: ThreadManager -> String -> (T.Handle -> IO ()) -> IO () -> IO ()
forkManagedTimeoutFinally mgr label io final = E.mask $ \restore ->
    forkManagedTimeout
        mgr
        label
        (\th -> E.try (restore $ io th) >>= \(_ :: Either E.SomeException ()) -> final)

setup :: TVar (Map Key ManagedThread) -> IO (Key, Weak ThreadId, IORef Bool)
setup var = do
    (wtid, n) <- myWeakThradId
    ref <- newIORef False
    let ent = ManagedThread wtid ref
    -- asking to throw KilledByThreadManager to me
    atomically $ modifyTVar' var $ Map.insert n ent
    return (n, wtid, ref)

lockAndKill :: Exception e => Weak ThreadId -> IORef Bool -> e -> IO ()
lockAndKill wtid ref e = do
    alreadyLocked <- atomicModifyIORef' ref (\b -> (True, b)) -- try to lock
    unless alreadyLocked $ do
        mtid <- deRefWeak wtid
        case mtid of
            Nothing -> return ()
            Just tid -> E.throwTo tid e

clear
    :: TVar (Map Key ManagedThread)
    -> (Key, Weak ThreadId, IORef Bool)
    -> IO ()
clear var (n, _, _) = atomically $ modifyTVar' var $ Map.delete n

ignore :: KilledByThreadManager -> IO ()
ignore (KilledByThreadManager _) = return ()

-- | Wait until all managed thread are finished.
waitUntilAllGone :: ThreadManager -> IO ()
waitUntilAllGone (ThreadManager _timmgr var) = atomically $ do
    m <- readTVar var
    check (Map.size m == 0)

isAllGone :: ThreadManager -> STM Bool
isAllGone (ThreadManager _timmgr var) = do
    m <- readTVar var
    return (Map.size m == 0)

----------------------------------------------------------------

myWeakThradId :: IO (Weak ThreadId, Key)
myWeakThradId = do
    tid <- myThreadId
    wtid <- mkWeakThreadId tid
    let n = fromThreadId tid
    return (wtid, n)

labelMe :: String -> IO ()
labelMe l = do
    tid <- myThreadId
    labelThread tid l

withHandle
    :: ThreadManager -> T.TimeoutAction -> (T.Handle -> IO a) -> IO (Maybe a)
withHandle (ThreadManager timmgr _) = T.withHandle timmgr

#if __GLASGOW_HASKELL__ < 908
fromThreadId :: ThreadId -> Word64
fromThreadId tid = read (drop 9 $ show tid)
#endif