File: Gang.hs

package info (click to toggle)
haskell-repa 3.4.1.5-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 304 kB
  • sloc: haskell: 3,135; makefile: 2
file content (218 lines) | stat: -rw-r--r-- 7,313 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
{-# LANGUAGE CPP #-}

-- | Gang Primitives.
module Data.Array.Repa.Eval.Gang
        ( theGang
        , Gang, forkGang, gangSize, gangIO, gangST)     
where
import GHC.IO
import GHC.ST
import GHC.Conc                 (forkOn)
import Control.Concurrent.MVar
import Control.Exception        (assert)
import Control.Monad
import GHC.Conc                 (numCapabilities)
import System.IO


-- TheGang --------------------------------------------------------------------
-- | This globally shared gang is auto-initialised at startup and shared by all
--   Repa computations.
--
--   In a data parallel setting, it does not help to have multiple gangs
--   running at the same time. This is because a single data parallel
--   computation should already be able to keep all threads busy. If we had
--   multiple gangs running at the same time, then the system as a whole would
--   run slower as the gangs would contend for cache and thrash the scheduler.
--
--   If, due to laziness or otherwise, you try to start multiple parallel
--   Repa computations at the same time, then you will get the following
--   warning on stderr at runtime:
--
-- @Data.Array.Repa: Performing nested parallel computation sequentially.
--    You've probably called the 'compute' or 'copy' function while another
--    instance was already running. This can happen if the second version
--    was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure that
--    each array is fully evaluated before you 'compute' the next one.
-- @
--
theGang :: Gang
{-# NOINLINE theGang #-}
theGang 
 = unsafePerformIO 
 $ do   let caps        = numCapabilities
        forkGang caps


-- Requests -------------------------------------------------------------------
-- | The 'Req' type encapsulates work requests for individual members of a gang.
data Req
        -- | Instruct the worker to run the given action.
        = ReqDo        (Int -> IO ())

        -- | Tell the worker that we're shutting the gang down.
        --   The worker should signal that it's receieved the request by
        --   writing to its result var before returning to the caller (forkGang).
        | ReqShutdown


-- Gang -----------------------------------------------------------------------
-- | A 'Gang' is a group of threads that execute arbitrary work requests.
data Gang
        = Gang 
        { -- | Number of threads in the gang.
          _gangThreads           :: !Int           

          -- | Workers listen for requests on these vars.
        , _gangRequestVars       :: [MVar Req]     

          -- | Workers put their results in these vars.
        , _gangResultVars        :: [MVar ()] 

          -- | Indicates that the gang is busy.
        , _gangBusy              :: MVar Bool
        } 

instance Show Gang where
  showsPrec p (Gang n _ _ _)
        = showString "<<"
        . showsPrec p n
        . showString " threads>>"


-- | O(1). Yield the number of threads in the 'Gang'.
gangSize :: Gang -> Int
gangSize (Gang n _ _ _) 
        = n


-- | Fork a 'Gang' with the given number of threads (at least 1).
forkGang :: Int -> IO Gang
forkGang n
 = assert (n > 0)
 $ do
        -- Create the vars we'll use to issue work requests.
        mvsRequest     <- sequence $ replicate n $ newEmptyMVar

        -- Create the vars we'll use to signal that threads are done.
        mvsDone        <- sequence $ replicate n $ newEmptyMVar

        -- Add finalisers so we can shut the workers down cleanly if they
        -- become unreachable.
        zipWithM_ (\varReq varDone 
                        -> mkWeakMVar varReq (finaliseWorker varReq varDone)) 
                mvsRequest
                mvsDone

        -- Create all the worker threads
        zipWithM_ forkOn [0..]
                $ zipWith3 gangWorker 
                        [0 .. n-1] mvsRequest mvsDone

        -- The gang is currently idle.
        busy   <- newMVar False

        return $ Gang n mvsRequest mvsDone busy



-- | The worker thread of a 'Gang'.
--   The threads blocks on the MVar waiting for a work request.
gangWorker :: Int -> MVar Req -> MVar () -> IO ()
gangWorker threadId varRequest varDone
 = do   
        -- Wait for a request 
        req     <- takeMVar varRequest

        case req of
         ReqDo action
          -> do -- Run the action we were given.
                action threadId

                -- Signal that the action is complete.
                putMVar varDone ()

                -- Wait for more requests.
                gangWorker threadId varRequest varDone

         ReqShutdown
          ->    putMVar varDone ()


-- | Finaliser for worker threads.
--   We want to shutdown the corresponding thread when it's MVar becomes
--   unreachable.
--   Without this Repa programs can complain about "Blocked indefinitely
--   on an MVar" because worker threads are still blocked on the request
--   MVars when the program ends. Whether the finalizer is called or not
--   is very racey. It happens about 1 in 10 runs when for the
--   repa-edgedetect benchmark, and less often with the others.
--
--   We're relying on the comment in System.Mem.Weak that says
--    "If there are no other threads to run, the runtime system will
--     check for runnablefinalizers before declaring the system to be
--     deadlocked."
--
--   If we were creating and destroying the gang cleanly we wouldn't need
--     this, but theGang is created with a top-level unsafePerformIO.
--     Hacks beget hacks beget hacks...
--
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker varReq varDone 
 = do   putMVar varReq ReqShutdown
        takeMVar varDone
        return ()


-- | Issue work requests for the 'Gang' and wait until they complete.
--
--   If the gang is already busy then print a warning to `stderr` and just
--   run the actions sequentially in the requesting thread.
gangIO  :: Gang
        -> (Int -> IO ())
        -> IO ()

{-# NOINLINE gangIO #-}
gangIO gang@(Gang _ _ _ busy) action
 = do   b <- swapMVar busy True
        if b
         then do
                seqIO gang action

         else do
                parIO gang action
                _ <- swapMVar busy False
                return ()


-- | Run an action on the gang sequentially.
seqIO   :: Gang -> (Int -> IO ()) -> IO ()
seqIO (Gang n _ _ _) action
 = do   hPutStr stderr
         $ unlines
         [ "Data.Array.Repa: Performing nested parallel computation sequentially."
         , "  You've probably called the 'compute' or 'copy' function while another"
         , "  instance was already running. This can happen if the second version"
         , "  was suspended due to lazy evaluation. Use 'deepSeqArray' to ensure"
         , "  that each array is fully evaluated before you 'compute' the next one."
         , "" ]

        mapM_ action [0 .. n-1]

-- | Run an action on the gang in parallel.
parIO   :: Gang -> (Int -> IO ()) -> IO ()
parIO (Gang _ mvsRequest mvsResult _) action
 = do   
        -- Send requests to all the threads.
        mapM_ (\v -> putMVar v (ReqDo action)) mvsRequest

        -- Wait for all the requests to complete.
        mapM_ takeMVar mvsResult


-- | Same as 'gangIO' but in the 'ST' monad.
gangST :: Gang -> (Int -> ST s ()) -> ST s ()
gangST g p = unsafeIOToST . gangIO g $ unsafeSTToIO . p