1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
-----------------------------------------------------------------------------
-- |
-- Module : Distribution.Client.JobControl
-- Copyright : (c) Duncan Coutts 2012
-- License : BSD-like
--
-- Maintainer : cabal-devel@haskell.org
-- Stability : provisional
-- Portability : portable
--
-- A job control concurrency abstraction
-----------------------------------------------------------------------------
module Distribution.Client.JobControl (
JobControl,
newSerialJobControl,
newParallelJobControl,
spawnJob,
collectJob,
JobLimit,
newJobLimit,
withJobLimit,
Lock,
newLock,
criticalSection
) where
import Control.Monad
import Control.Concurrent hiding (QSem, newQSem, waitQSem, signalQSem)
import Control.Exception (SomeException, bracket_, mask, throw, try)
import Distribution.Client.Compat.Semaphore
data JobControl m a = JobControl {
spawnJob :: m a -> m (),
collectJob :: m a
}
newSerialJobControl :: IO (JobControl IO a)
newSerialJobControl = do
queue <- newChan
return JobControl {
spawnJob = spawn queue,
collectJob = collect queue
}
where
spawn :: Chan (IO a) -> IO a -> IO ()
spawn = writeChan
collect :: Chan (IO a) -> IO a
collect = join . readChan
newParallelJobControl :: IO (JobControl IO a)
newParallelJobControl = do
resultVar <- newEmptyMVar
return JobControl {
spawnJob = spawn resultVar,
collectJob = collect resultVar
}
where
spawn :: MVar (Either SomeException a) -> IO a -> IO ()
spawn resultVar job =
mask $ \restore ->
forkIO (do res <- try (restore job)
putMVar resultVar res)
>> return ()
collect :: MVar (Either SomeException a) -> IO a
collect resultVar =
takeMVar resultVar >>= either throw return
data JobLimit = JobLimit QSem
newJobLimit :: Int -> IO JobLimit
newJobLimit n =
fmap JobLimit (newQSem n)
withJobLimit :: JobLimit -> IO a -> IO a
withJobLimit (JobLimit sem) =
bracket_ (waitQSem sem) (signalQSem sem)
newtype Lock = Lock (MVar ())
newLock :: IO Lock
newLock = fmap Lock $ newMVar ()
criticalSection :: Lock -> IO a -> IO a
criticalSection (Lock lck) act = bracket_ (takeMVar lck) (putMVar lck ()) act
|