
|
{-# LANGUAGE CPP #-}
-- | Gang Primitives.
module Data.Array.Repa.Eval.Gang
( theGang
, Gang, forkGang, gangSize, gangIO, gangST)
where
import GHC.IO
import GHC.ST
import GHC.Conc (forkOn)
import Control.Concurrent.MVar
import Control.Exception (assert)
import Control.Monad
import GHC.Conc (numCapabilities)
import System.IO
-- TheGang --------------------------------------------------------------------
-- | This globally shared gang is auto-initialised at startup and shared by all
-- Repa computations.
--
-- In a data parallel setting, it does not help to have multiple gangs
-- running at the same time. This is because a single data parallel
-- computation should already be able to keep all threads busy. If we had
-- multiple gangs running at the same time, then the system as a whole would
-- run slower as the gangs would contend for cache and thrash the scheduler.
--
-- If, due to laziness or otherwise, you try to start multiple parallel
-- Repa computations at the same time, then you will get the following
-- warning on stderr at runtime:
--
-- @Data.Array.Repa: Performing nested parallel computation sequentially.
-- You've probably called the 'compute' or 'copy' function while another
-- instance was already running. This can happen if the second version
-- was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure that
-- each array is fully evaluated before you 'compute' the next one.
-- @
--
theGang :: Gang
{-# NOINLINE theGang #-}
theGang
= unsafePerformIO
$ do let caps = numCapabilities
forkGang caps
-- Requests -------------------------------------------------------------------
-- | The 'Req' type encapsulates work requests for individual members of a gang.
data Req
-- | Instruct the worker to run the given action.
= ReqDo (Int -> IO ())
-- | Tell the worker that we're shutting the gang down.
-- The worker should signal that it's receieved the request by
-- writing to its result var before returning to the caller (forkGang).
| ReqShutdown
-- Gang -----------------------------------------------------------------------
-- | A 'Gang' is a group of threads that execute arbitrary work requests.
data Gang
= Gang
{ -- | Number of threads in the gang.
_gangThreads :: !Int
-- | Workers listen for requests on these vars.
, _gangRequestVars :: [MVar Req]
-- | Workers put their results in these vars.
, _gangResultVars :: [MVar ()]
-- | Indicates that the gang is busy.
, _gangBusy :: MVar Bool
}
instance Show Gang where
showsPrec p (Gang n _ _ _)
= showString "<<"
. showsPrec p n
. showString " threads>>"
-- | O(1). Yield the number of threads in the 'Gang'.
gangSize :: Gang -> Int
gangSize (Gang n _ _ _)
= n
-- | Fork a 'Gang' with the given number of threads (at least 1).
forkGang :: Int -> IO Gang
forkGang n
= assert (n > 0)
$ do
-- Create the vars we'll use to issue work requests.
mvsRequest <- sequence $ replicate n $ newEmptyMVar
-- Create the vars we'll use to signal that threads are done.
mvsDone <- sequence $ replicate n $ newEmptyMVar
-- Add finalisers so we can shut the workers down cleanly if they
-- become unreachable.
zipWithM_ (\varReq varDone
-> mkWeakMVar varReq (finaliseWorker varReq varDone))
mvsRequest
mvsDone
-- Create all the worker threads
zipWithM_ forkOn [0..]
$ zipWith3 gangWorker
[0 .. n-1] mvsRequest mvsDone
-- The gang is currently idle.
busy <- newMVar False
return $ Gang n mvsRequest mvsDone busy
-- | The worker thread of a 'Gang'.
-- The threads blocks on the MVar waiting for a work request.
gangWorker :: Int -> MVar Req -> MVar () -> IO ()
gangWorker threadId varRequest varDone
= do
-- Wait for a request
req <- takeMVar varRequest
case req of
ReqDo action
-> do -- Run the action we were given.
action threadId
-- Signal that the action is complete.
putMVar varDone ()
-- Wait for more requests.
gangWorker threadId varRequest varDone
ReqShutdown
-> putMVar varDone ()
-- | Finaliser for worker threads.
-- We want to shutdown the corresponding thread when it's MVar becomes
-- unreachable.
-- Without this Repa programs can complain about "Blocked indefinitely
-- on an MVar" because worker threads are still blocked on the request
-- MVars when the program ends. Whether the finalizer is called or not
-- is very racey. It happens about 1 in 10 runs when for the
-- repa-edgedetect benchmark, and less often with the others.
--
-- We're relying on the comment in System.Mem.Weak that says
-- "If there are no other threads to run, the runtime system will
-- check for runnablefinalizers before declaring the system to be
-- deadlocked."
--
-- If we were creating and destroying the gang cleanly we wouldn't need
-- this, but theGang is created with a top-level unsafePerformIO.
-- Hacks beget hacks beget hacks...
--
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker varReq varDone
= do putMVar varReq ReqShutdown
takeMVar varDone
return ()
-- | Issue work requests for the 'Gang' and wait until they complete.
--
-- If the gang is already busy then print a warning to `stderr` and just
-- run the actions sequentially in the requesting thread.
gangIO :: Gang
-> (Int -> IO ())
-> IO ()
{-# NOINLINE gangIO #-}
gangIO gang@(Gang _ _ _ busy) action
= do b <- swapMVar busy True
if b
then do
seqIO gang action
else do
parIO gang action
_ <- swapMVar busy False
return ()
-- | Run an action on the gang sequentially.
seqIO :: Gang -> (Int -> IO ()) -> IO ()
seqIO (Gang n _ _ _) action
= do hPutStr stderr
$ unlines
[ "Data.Array.Repa: Performing nested parallel computation sequentially."
, " You've probably called the 'compute' or 'copy' function while another"
, " instance was already running. This can happen if the second version"
, " was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure"
, " that each array is fully evaluated before you 'compute' the next one."
, "" ]
mapM_ action [0 .. n-1]
-- | Run an action on the gang in parallel.
parIO :: Gang -> (Int -> IO ()) -> IO ()
parIO (Gang _ mvsRequest mvsResult _) action
= do
-- Send requests to all the threads.
mapM_ (\v -> putMVar v (ReqDo action)) mvsRequest
-- Wait for all the requests to complete.
mapM_ takeMVar mvsResult
-- | Same as 'gangIO' but in the 'ST' monad.
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p
|