1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
-- | Thread pool implementation.
module Development.Shake.Pool(Pool, addPool, blockPool, runPool) where
import Control.Concurrent
import Control.Exception hiding (blocked)
import Control.Monad
import General.Base
import General.Timing
import qualified Data.HashSet as Set
import System.IO.Unsafe
import System.Random
---------------------------------------------------------------------
-- UNFAIR/RANDOM QUEUE
-- Monad for non-deterministic (but otherwise pure) computations
type NonDet a = IO a
nonDet :: NonDet [Bool]
nonDet = do bs <- unsafeInterleaveIO nonDet
b <- randomIO
return $ b:bs
-- Left = deterministic list, Right = non-deterministic tree
data Queue a = Queue [a] (Either [a] (Maybe (Tree a)))
newQueue :: Bool -> Queue a
newQueue deterministic = Queue [] $ if deterministic then Left [] else Right Nothing
enqueuePriority :: a -> Queue a -> Queue a
enqueuePriority x (Queue p t) = Queue (x:p) t
enqueue :: a -> Queue a -> NonDet (Queue a)
enqueue x (Queue p (Left xs)) = return $ Queue p $ Left $ x:xs
enqueue x (Queue p (Right Nothing)) = return $ Queue p $ Right $ Just $ Leaf x
enqueue x (Queue p (Right (Just t))) = do bs <- nonDet; return $ Queue p $ Right $ Just $ insertTree bs x t
dequeue :: Queue a -> Maybe (NonDet (a, Queue a))
dequeue (Queue (p:ps) t) = Just $ return (p, Queue ps t)
dequeue (Queue [] (Left (x:xs))) = Just $ return (x, Queue [] $ Left xs)
dequeue (Queue [] (Left [])) = Nothing
dequeue (Queue [] (Right (Just t))) = Just $ do bs <- nonDet; (x,t) <- return $ removeTree bs t; return (x, Queue [] $ Right t)
dequeue (Queue [] (Right Nothing)) = Nothing
---------------------------------------------------------------------
-- TREE
-- Note that for a Random tree, since everything is Random, Branch x y =~= Branch y x
data Tree a = Leaf a | Branch (Tree a) (Tree a)
insertTree :: [Bool] -> a -> Tree a -> Tree a
insertTree _ x (Leaf y) = Branch (Leaf x) (Leaf y)
insertTree (b:bs) x (Branch y z) = if b then f y z else f z y
where f y z = Branch y (insertTree bs x z)
removeTree :: [Bool] -> Tree a -> (a, Maybe (Tree a))
removeTree _ (Leaf x) = (x, Nothing)
removeTree (b:bs) (Branch y z) = if b then f y z else f z y
where
f y z = case removeTree bs z of
(x, Nothing) -> (x, Just y)
(x, Just z) -> (x, Just $ Branch y z)
---------------------------------------------------------------------
-- THREAD POOL
{-
Must keep a list of active threads, so can raise exceptions in a timely manner
Must spawn a fresh thread to do blockPool
If any worker throws an exception, must signal to all the other workers
-}
data Pool = Pool {-# UNPACK #-} !Int !(Var (Maybe S)) !(Barrier (Either SomeException S))
data S = S
{threads :: !(Set.HashSet ThreadId) -- IMPORTANT: Must be strict or we leak thread stackssss
,threadsMax :: {-# UNPACK #-} !Int -- high water mark of Set.size threads
,threadsSum :: {-# UNPACK #-} !Int -- number of threads we have been through
,working :: {-# UNPACK #-} !Int -- threads which are actively working
,blocked :: {-# UNPACK #-} !Int -- threads which are blocked
,todo :: !(Queue (IO ()))
}
emptyS :: Bool -> S
emptyS deterministic = S Set.empty 0 0 0 0 $ newQueue deterministic
-- | Given a pool, and a function that breaks the S invariants, restore them
-- They are only allowed to touch working or todo
step :: Pool -> (S -> NonDet S) -> IO ()
step pool@(Pool n var done) op = do
let onVar act = modifyVar_ var $ maybe (return Nothing) act
onVar $ \s -> do
s <- op s
res <- maybe (return Nothing) (fmap Just) $ dequeue $ todo s
case res of
Just (now, todo2) | working s < n -> do
-- spawn a new worker
t <- forkIO $ do
t <- myThreadId
res <- try now
case res of
Left e -> onVar $ \s -> do
mapM_ killThread $ Set.toList $ Set.delete t $ threads s
signalBarrier done $ Left e
return Nothing
Right _ -> step pool $ \s -> return s{working = working s - 1, threads = Set.delete t $ threads s}
let threads2 = Set.insert t $ threads s
return $ Just s{working = working s + 1, todo = todo2, threads = threads2
,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` Set.size threads2}
Nothing | working s == 0 && blocked s == 0 -> do
signalBarrier done $ Right s
return Nothing
_ -> return $ Just s
-- | Add a new task to the pool
addPool :: Pool -> IO a -> IO ()
addPool pool act = step pool $ \s -> do
todo <- enqueue (void act) (todo s)
return s{todo = todo}
-- | A blocking action is being run while on the pool, yield your thread.
-- Should only be called by an action under addPool.
--
-- If the first part of the result is True then the result is sufficiently high
-- priority that you may exceed the pool limit to get it done immediately.
-- Always the result of a child thread raising an error, which will probably
-- raise an error in the parent.
blockPool :: Pool -> IO (Bool, a) -> IO a
blockPool pool act = do
step pool $ \s -> return s{working = working s - 1, blocked = blocked s + 1}
(urgent,res) <- act
var <- newBarrier
let act = do
step pool $ \s -> return s{working = working s + 1, blocked = blocked s - 1}
signalBarrier var ()
if urgent then
act -- may exceed the pool count
else
step pool $ \s -> return s{todo = enqueuePriority act $ todo s}
waitBarrier var
return res
-- | Run all the tasks in the pool on the given number of works.
-- If any thread throws an exception, the exception will be reraised.
runPool :: Bool -> Int -> (Pool -> IO ()) -> IO () -- run all tasks in the pool
runPool deterministic n act = do
s <- newVar $ Just $ emptyS deterministic
let cleanup = modifyVar_ s $ \s -> do
-- if someone kills our thread, make sure we kill our child threads
case s of
Just s -> mapM_ killThread $ Set.toList $ threads s
Nothing -> return ()
return Nothing
flip onException cleanup $ do
res <- newBarrier
let pool = Pool n s res
addPool pool $ act pool
res <- waitBarrier res
case res of
Left e -> throw e
Right s -> addTiming $ "Pool finished (" ++ show (threadsSum s) ++ " threads, " ++ show (threadsMax s) ++ " max)"
|