File: Pool.hs

package info (click to toggle)
haskell-shake 0.13.2%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 888 kB
  • ctags: 127
  • sloc: haskell: 6,388; makefile: 35; ansic: 25; sh: 2
file content (170 lines) | stat: -rw-r--r-- 6,680 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170

-- | Thread pool implementation.
module Development.Shake.Pool(Pool, addPool, blockPool, runPool) where

import Control.Concurrent
import Control.Exception hiding (blocked)
import Control.Monad
import General.Base
import General.Timing
import qualified Data.HashSet as Set
import System.IO.Unsafe
import System.Random


---------------------------------------------------------------------
-- UNFAIR/RANDOM QUEUE

-- Monad for non-deterministic (but otherwise pure) computations
type NonDet a = IO a

nonDet :: NonDet [Bool]
nonDet = do bs <- unsafeInterleaveIO nonDet
            b <- randomIO
            return $ b:bs

-- Left = deterministic list, Right = non-deterministic tree
data Queue a = Queue [a] (Either [a] (Maybe (Tree a)))

newQueue :: Bool -> Queue a
newQueue deterministic = Queue [] $ if deterministic then Left [] else Right Nothing

enqueuePriority :: a -> Queue a -> Queue a
enqueuePriority x (Queue p t) = Queue (x:p) t

enqueue :: a -> Queue a -> NonDet (Queue a)
enqueue x (Queue p (Left xs)) = return $ Queue p $ Left $ x:xs
enqueue x (Queue p (Right Nothing)) = return $ Queue p $ Right $ Just $ Leaf x
enqueue x (Queue p (Right (Just t))) = do bs <- nonDet; return $ Queue p $ Right $ Just $ insertTree bs x t

dequeue :: Queue a -> Maybe (NonDet (a, Queue a))
dequeue (Queue (p:ps) t) = Just $ return (p, Queue ps t)
dequeue (Queue [] (Left (x:xs))) = Just $ return (x, Queue [] $ Left xs)
dequeue (Queue [] (Left [])) = Nothing
dequeue (Queue [] (Right (Just t))) = Just $ do bs <- nonDet; (x,t) <- return $ removeTree bs t; return (x, Queue [] $ Right t)
dequeue (Queue [] (Right Nothing)) = Nothing


---------------------------------------------------------------------
-- TREE

-- Note that for a Random tree, since everything is Random, Branch x y =~= Branch y x
data Tree a = Leaf a | Branch (Tree a) (Tree a)

insertTree :: [Bool] -> a -> Tree a -> Tree a
insertTree _ x (Leaf y) = Branch (Leaf x) (Leaf y)
insertTree (b:bs) x (Branch y z) = if b then f y z else f z y
    where f y z = Branch y (insertTree bs x z)

removeTree :: [Bool] -> Tree a -> (a, Maybe (Tree a))
removeTree _ (Leaf x) = (x, Nothing)
removeTree (b:bs) (Branch y z) = if b then f y z else f z y
    where
        f y z = case removeTree bs z of
                    (x, Nothing) -> (x, Just y)
                    (x, Just z) -> (x, Just $ Branch y z)


---------------------------------------------------------------------
-- THREAD POOL

{-
Must keep a list of active threads, so can raise exceptions in a timely manner
Must spawn a fresh thread to do blockPool
If any worker throws an exception, must signal to all the other workers
-}

data Pool = Pool {-# UNPACK #-} !Int !(Var (Maybe S)) !(Barrier (Either SomeException S))

data S = S
    {threads :: !(Set.HashSet ThreadId) -- IMPORTANT: Must be strict or we leak thread stackssss
    ,threadsMax :: {-# UNPACK #-} !Int -- high water mark of Set.size threads
    ,threadsSum :: {-# UNPACK #-} !Int -- number of threads we have been through
    ,working :: {-# UNPACK #-} !Int -- threads which are actively working
    ,blocked :: {-# UNPACK #-} !Int -- threads which are blocked
    ,todo :: !(Queue (IO ()))
    }


emptyS :: Bool -> S
emptyS deterministic = S Set.empty 0 0 0 0 $ newQueue deterministic


-- | Given a pool, and a function that breaks the S invariants, restore them
--   They are only allowed to touch working or todo
step :: Pool -> (S -> NonDet S) -> IO ()
step pool@(Pool n var done) op = do
    let onVar act = modifyVar_ var $ maybe (return Nothing) act
    onVar $ \s -> do
        s <- op s
        res <- maybe (return Nothing) (fmap Just) $ dequeue $ todo s
        case res of
            Just (now, todo2) | working s < n -> do
                -- spawn a new worker
                t <- forkIO $ do
                    t <- myThreadId
                    res <- try now
                    case res of
                        Left e -> onVar $ \s -> do
                            mapM_ killThread $ Set.toList $ Set.delete t $ threads s
                            signalBarrier done $ Left e
                            return Nothing
                        Right _ -> step pool $ \s -> return s{working = working s - 1, threads = Set.delete t $ threads s}
                let threads2 = Set.insert t $ threads s
                return $ Just s{working = working s + 1, todo = todo2, threads = threads2
                               ,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` Set.size threads2}
            Nothing | working s == 0 && blocked s == 0 -> do
                signalBarrier done $ Right s
                return Nothing
            _ -> return $ Just s


-- | Add a new task to the pool
addPool :: Pool -> IO a -> IO ()
addPool pool act = step pool $ \s -> do
    todo <- enqueue (void act) (todo s)
    return s{todo = todo}


-- | A blocking action is being run while on the pool, yield your thread.
--   Should only be called by an action under addPool.
--
--   If the first part of the result is True then the result is sufficiently high
--   priority that you may exceed the pool limit to get it done immediately.
--   Always the result of a child thread raising an error, which will probably
--   raise an error in the parent.
blockPool :: Pool -> IO (Bool, a) -> IO a
blockPool pool act = do
    step pool $ \s -> return s{working = working s - 1, blocked = blocked s + 1}
    (urgent,res) <- act
    var <- newBarrier
    let act = do
            step pool $ \s -> return s{working = working s + 1, blocked = blocked s - 1}
            signalBarrier var ()
    if urgent then
        act -- may exceed the pool count
     else
        step pool $ \s -> return s{todo = enqueuePriority act $ todo s}
    waitBarrier var
    return res


-- | Run all the tasks in the pool on the given number of works.
--   If any thread throws an exception, the exception will be reraised.
runPool :: Bool -> Int -> (Pool -> IO ()) -> IO () -- run all tasks in the pool
runPool deterministic n act = do
    s <- newVar $ Just $ emptyS deterministic
    let cleanup = modifyVar_ s $ \s -> do
            -- if someone kills our thread, make sure we kill our child threads
            case s of
                Just s -> mapM_ killThread $ Set.toList $ threads s
                Nothing -> return ()
            return Nothing
    flip onException cleanup $ do
        res <- newBarrier
        let pool = Pool n s res
        addPool pool $ act pool
        res <- waitBarrier res
        case res of
            Left e -> throw e
            Right s -> addTiming $ "Pool finished (" ++ show (threadsSum s) ++ " threads, " ++ show (threadsMax s) ++ " max)"