1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
-- | A helper module which takes care of parallelism
{-# LANGUAGE DeriveDataTypeable #-}
module Test.Tasty.Parallel (runInParallel) where
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Foreign.StablePtr
import Data.Typeable
import GHC.Conc (labelThread)
data Interrupt = Interrupt
deriving Typeable
instance Show Interrupt where
show Interrupt = "interrupted"
instance Exception Interrupt
data ParThreadKilled = ParThreadKilled SomeException
deriving Typeable
instance Show ParThreadKilled where
show (ParThreadKilled exn) =
"tasty: one of the test running threads was killed by: " ++
show exn
instance Exception ParThreadKilled
shutdown :: ThreadId -> IO ()
shutdown = flip throwTo Interrupt
-- | Take a list of actions and execute them in parallel, no more than @n@
-- at the same time.
--
-- The action itself is asynchronous, ie. it returns immediately and does
-- the work in new threads. It returns an action which aborts tests and
-- cleans up.
runInParallel
:: Int -- ^ maximum number of parallel threads
-> [IO ()] -- ^ list of actions to execute
-> IO (IO ())
-- This implementation tries its best to ensure that exceptions are
-- properly propagated to the caller and threads are not left running.
--
-- Note that exceptions inside tests are already caught by the test
-- actions themselves. Any exceptions that reach this function or its
-- threads are by definition unexpected.
runInParallel nthreads actions = do
callingThread <- myThreadId
-- Don't let the main thread be garbage-collected
-- Otherwise we may get a "thread blocked indefinitely in an STM
-- transaction" exception when a child thread is blocked and GC'd.
-- (See e.g. https://github.com/feuerbach/tasty/issues/15)
_ <- newStablePtr callingThread
-- A variable containing all ThreadIds of forked threads.
--
-- These are the threads we'll need to kill if something wrong happens.
pidsVar <- atomically $ newTVar []
-- If an unexpected exception has been thrown and we started killing all
-- the spawned threads, this flag will be set to False, so that any
-- freshly spawned threads will know to terminate, even if their pids
-- didn't make it to the "kill list" yet.
aliveVar <- atomically $ newTVar True
let
-- Kill all threads.
shutdownAll :: IO ()
shutdownAll = do
pids <- atomically $ do
writeTVar aliveVar False
readTVar pidsVar
-- be sure not to kill myself!
me <- myThreadId
mapM_ shutdown $ filter (/= me) pids
cleanup :: Either SomeException () -> IO ()
cleanup Right {} = return ()
cleanup (Left exn)
| Just Interrupt <- fromException exn
-- I'm being shut down either by a fellow thread (which caught an
-- exception), or by the main thread which decided to stop running
-- tests. In any case, just end silently.
= return ()
| otherwise = do
-- Wow, I caught an exception (most probably an async one,
-- although it doesn't really matter). Shut down all other
-- threads, and re-throw my exception to the calling thread.
shutdownAll
throwTo callingThread $ ParThreadKilled exn
forkCarefully :: IO () -> IO ThreadId
forkCarefully action = flip myForkFinally cleanup $ do
-- We cannot check liveness and update the pidsVar in one
-- transaction before forking, because we don't know the new pid yet.
--
-- So we fork and then check/update. If something has happened in
-- the meantime, it's not a big deal — we just cancel. OTOH, if
-- we're alive at the time of the transaction, then we add our pid
-- and will be killed when something happens.
newPid <- myThreadId
join . atomically $ do
alive <- readTVar aliveVar
if alive
then do
modifyTVar pidsVar (newPid :)
return action
else
return (return ())
capsVar <- atomically $ newTVar nthreads
let
go a cont = join . atomically $ do
caps <- readTVar capsVar
if caps > 0
then do
writeTVar capsVar $! caps - 1
let
release = atomically $ modifyTVar' capsVar (+1)
-- Thanks to our exception handling, we won't deadlock even if
-- an exception strikes before we 'release'. Everything will be
-- killed, so why bother.
return $ do
pid <- forkCarefully (do a; release)
labelThread pid "tasty_test_thread"
cont
else retry
-- fork here as well, so that we can move to the UI without waiting
-- untill all tests have finished
pid <- forkCarefully $ foldr go (return ()) actions
labelThread pid "tasty_thread_manager"
return shutdownAll
-- Copied from base to stay compatible with GHC 7.4.
myForkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
myForkFinally action and_then =
mask $ \restore ->
forkIO $ try (restore action) >>= and_then
|