1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
{-# LANGUAGE CPP #-}
-- | Gang Primitives.
module Data.Array.Repa.Eval.Gang
( theGang
, Gang, forkGang, gangSize, gangIO, gangST)
where
import GHC.IO
import GHC.ST
import GHC.Conc (forkOn)
import Control.Concurrent.MVar
import Control.Exception (assert)
import Control.Monad
import GHC.Conc (numCapabilities)
import System.IO
-- TheGang --------------------------------------------------------------------
-- | This globally shared gang is auto-initialised at startup and shared by all
-- Repa computations.
--
-- In a data parallel setting, it does not help to have multiple gangs
-- running at the same time. This is because a single data parallel
-- computation should already be able to keep all threads busy. If we had
-- multiple gangs running at the same time, then the system as a whole would
-- run slower as the gangs would contend for cache and thrash the scheduler.
--
-- If, due to laziness or otherwise, you try to start multiple parallel
-- Repa computations at the same time, then you will get the following
-- warning on stderr at runtime:
--
-- @Data.Array.Repa: Performing nested parallel computation sequentially.
-- You've probably called the 'compute' or 'copy' function while another
-- instance was already running. This can happen if the second version
-- was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure that
-- each array is fully evaluated before you 'compute' the next one.
-- @
--
theGang :: Gang
{-# NOINLINE theGang #-}
theGang
= unsafePerformIO
$ do let caps = numCapabilities
forkGang caps
-- Requests -------------------------------------------------------------------
-- | The 'Req' type encapsulates work requests for individual members of a gang.
data Req
-- | Instruct the worker to run the given action.
= ReqDo (Int -> IO ())
-- | Tell the worker that we're shutting the gang down.
-- The worker should signal that it's receieved the request by
-- writing to its result var before returning to the caller (forkGang).
| ReqShutdown
-- Gang -----------------------------------------------------------------------
-- | A 'Gang' is a group of threads that execute arbitrary work requests.
data Gang
= Gang
{ -- | Number of threads in the gang.
_gangThreads :: !Int
-- | Workers listen for requests on these vars.
, _gangRequestVars :: [MVar Req]
-- | Workers put their results in these vars.
, _gangResultVars :: [MVar ()]
-- | Indicates that the gang is busy.
, _gangBusy :: MVar Bool
}
instance Show Gang where
showsPrec p (Gang n _ _ _)
= showString "<<"
. showsPrec p n
. showString " threads>>"
-- | O(1). Yield the number of threads in the 'Gang'.
gangSize :: Gang -> Int
gangSize (Gang n _ _ _)
= n
-- | Fork a 'Gang' with the given number of threads (at least 1).
forkGang :: Int -> IO Gang
forkGang n
= assert (n > 0)
$ do
-- Create the vars we'll use to issue work requests.
mvsRequest <- sequence $ replicate n $ newEmptyMVar
-- Create the vars we'll use to signal that threads are done.
mvsDone <- sequence $ replicate n $ newEmptyMVar
-- Add finalisers so we can shut the workers down cleanly if they
-- become unreachable.
zipWithM_ (\varReq varDone
-> mkWeakMVar varReq (finaliseWorker varReq varDone))
mvsRequest
mvsDone
-- Create all the worker threads
zipWithM_ forkOn [0..]
$ zipWith3 gangWorker
[0 .. n-1] mvsRequest mvsDone
-- The gang is currently idle.
busy <- newMVar False
return $ Gang n mvsRequest mvsDone busy
-- | The worker thread of a 'Gang'.
-- The threads blocks on the MVar waiting for a work request.
gangWorker :: Int -> MVar Req -> MVar () -> IO ()
gangWorker threadId varRequest varDone
= do
-- Wait for a request
req <- takeMVar varRequest
case req of
ReqDo action
-> do -- Run the action we were given.
action threadId
-- Signal that the action is complete.
putMVar varDone ()
-- Wait for more requests.
gangWorker threadId varRequest varDone
ReqShutdown
-> putMVar varDone ()
-- | Finaliser for worker threads.
-- We want to shutdown the corresponding thread when it's MVar becomes
-- unreachable.
-- Without this Repa programs can complain about "Blocked indefinitely
-- on an MVar" because worker threads are still blocked on the request
-- MVars when the program ends. Whether the finalizer is called or not
-- is very racey. It happens about 1 in 10 runs when for the
-- repa-edgedetect benchmark, and less often with the others.
--
-- We're relying on the comment in System.Mem.Weak that says
-- "If there are no other threads to run, the runtime system will
-- check for runnablefinalizers before declaring the system to be
-- deadlocked."
--
-- If we were creating and destroying the gang cleanly we wouldn't need
-- this, but theGang is created with a top-level unsafePerformIO.
-- Hacks beget hacks beget hacks...
--
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker varReq varDone
= do putMVar varReq ReqShutdown
takeMVar varDone
return ()
-- | Issue work requests for the 'Gang' and wait until they complete.
--
-- If the gang is already busy then print a warning to `stderr` and just
-- run the actions sequentially in the requesting thread.
gangIO :: Gang
-> (Int -> IO ())
-> IO ()
{-# NOINLINE gangIO #-}
gangIO gang@(Gang _ _ _ busy) action
= do b <- swapMVar busy True
if b
then do
seqIO gang action
else do
parIO gang action
_ <- swapMVar busy False
return ()
-- | Run an action on the gang sequentially.
seqIO :: Gang -> (Int -> IO ()) -> IO ()
seqIO (Gang n _ _ _) action
= do hPutStr stderr
$ unlines
[ "Data.Array.Repa: Performing nested parallel computation sequentially."
, " You've probably called the 'compute' or 'copy' function while another"
, " instance was already running. This can happen if the second version"
, " was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure"
, " that each array is fully evaluated before you 'compute' the next one."
, "" ]
mapM_ action [0 .. n-1]
-- | Run an action on the gang in parallel.
parIO :: Gang -> (Int -> IO ()) -> IO ()
parIO (Gang _ mvsRequest mvsResult _) action
= do
-- Send requests to all the threads.
mapM_ (\v -> putMVar v (ReqDo action)) mvsRequest
-- Wait for all the requests to complete.
mapM_ takeMVar mvsResult
-- | Same as 'gangIO' but in the 'ST' monad.
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p
|