1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
module Test.Framework.Runners.ThreadPool (
executeOnPool
) where
import Control.Concurrent
import Control.Monad
import qualified Data.IntMap as IM
import Foreign.StablePtr
data WorkerEvent token a = WorkerTermination
| WorkerItem token a
-- | Execute IO actions on several threads and return their results in the original
-- order. It is guaranteed that no action from the input list is executed unless all
-- the items that precede it in the list have been executed or are executing at that
-- moment.
executeOnPool :: Int -- ^ Number of threads to use
-> [IO a] -- ^ Actions to execute: these will be scheduled left to right
-> IO [a] -- ^ Ordered results of executing the given IO actions in parallel
executeOnPool n actions = do
-- Prepare the channels
input_chan <- newChan
output_chan <- newChan
-- Write the actions as items to the channel followed by one termination per thread
-- that indicates they should terminate. We do this on another thread for
-- maximum laziness (in case one the actions we are going to run depend on the
-- output from previous actions..)
_ <- forkIO $ writeList2Chan input_chan (zipWith WorkerItem [0..] actions ++ replicate n WorkerTermination)
-- Spawn workers
forM_ [1..n] (const $ forkIO $ poolWorker input_chan output_chan)
-- Short version: make sure we do the right thing if a test blocks on dead
-- MVars or TVars.
-- Long version: GHC is clever enough to throw an exception (BlockedOnDeadMVar
-- or BlockedIndefinitely) when a thread is waiting for a MVar or TVar that can't
-- be written to. However, it doesn't know anything about the handlers for those
-- exceptions. Therefore, when a worker runs a test that causes this exception,
-- since the main thread is blocking on the worker, the main thread gets the
-- exception too despite the fact that the main thread will be runnable as soon
-- as the worker catches its own exception. The below makes sure the main thread
-- is always reachable by the GC, which is the mechanism for finding threads
-- that are unrunnable.
-- See also the ticket where SimonM (semi-cryptically) explains this:
-- http://hackage.haskell.org/trac/ghc/ticket/3291
--
-- NB: this actually leaks stable pointers. We could prevent this by making
-- takeWhileWorkersExist do |unsafePerformIO newStablePtr| when returning the
-- lazily-demanded tail of the list, but its a bit of a pain. For now, just
-- grit our teeth and accept the leak.
_stablePtr <- myThreadId >>= newStablePtr
-- Return the results generated by the worker threads lazily and in
-- the same order as we got the inputs
fmap (reorderFrom 0 . takeWhileWorkersExist n) $ getChanContents output_chan
poolWorker :: Chan (WorkerEvent token (IO a)) -> Chan (WorkerEvent token a) -> IO ()
poolWorker input_chan output_chan = do
-- Read an action and work out whether we should continue or stop
action_item <- readChan input_chan
case action_item of
WorkerTermination -> writeChan output_chan WorkerTermination -- Must have run out of real actions to execute
WorkerItem token action -> do
-- Do the action then loop
result <- action
writeChan output_chan (WorkerItem token result)
poolWorker input_chan output_chan
-- | Keep grabbing items out of the infinite list of worker outputs until we have
-- recieved word that all of the workers have shut down. This lets us turn a possibly
-- infinite list of outputs into a certainly finite one suitable for use with reorderFrom.
takeWhileWorkersExist :: Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist worker_count events
| worker_count <= 0 = []
| otherwise = case events of
(WorkerTermination:events') -> takeWhileWorkersExist (worker_count - 1) events'
(WorkerItem token x:events') -> (token, x) : takeWhileWorkersExist worker_count events'
[] -> []
-- | This function carefully shuffles the input list so it in the total order
-- defined by the integers paired with the elements. If the list is @xs@ and
-- the supplied initial integer is @n@, it must be the case that:
--
-- > sort (map fst xs) == [n..n + (length xs - 1)]
--
-- This function returns items in the lazy result list as soon as it is sure
-- it has the right item for that position.
reorderFrom :: Int -> [(Int, a)] -> [a]
reorderFrom from initial_things = go from initial_things IM.empty False
where go next [] buf _
| IM.null buf = [] -- If the buffer and input list is empty, we're done
| otherwise = go next (IM.toList buf) IM.empty False -- Make sure we check the buffer even if the list is done
go next all_things@((token, x):things) buf buf_useful
| token == next -- If the list token matches the one we were expecting we can just take the item
= x : go (next + 1) things buf True -- Always worth checking the buffer now because the expected item has changed
| buf_useful -- If it's worth checking the buffer, it's possible the token we need is in it
, (Just x', buf') <- IM.updateLookupWithKey (const $ const Nothing) next buf -- Delete the found item from the map (if we find it) to save space
= x' : go (next + 1) all_things buf' True -- Always worth checking the buffer now because the expected item has changed
| otherwise -- Token didn't match, buffer unhelpful: it must be in the tail of the list
= go next things (IM.insert token x buf) False -- Since we've already checked the buffer, stop bothering to do so until something changes -}
|