File: JobControl.hs

package info (click to toggle)
haskell-cabal-install 1.20.0.3-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,324 kB
  • ctags: 10
  • sloc: haskell: 18,563; sh: 225; ansic: 36; makefile: 6
file content (89 lines) | stat: -rw-r--r-- 2,290 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
-----------------------------------------------------------------------------
-- |
-- Module      :  Distribution.Client.JobControl
-- Copyright   :  (c) Duncan Coutts 2012
-- License     :  BSD-like
--
-- Maintainer  :  cabal-devel@haskell.org
-- Stability   :  provisional
-- Portability :  portable
--
-- A job control concurrency abstraction
-----------------------------------------------------------------------------
module Distribution.Client.JobControl (
    JobControl,
    newSerialJobControl,
    newParallelJobControl,
    spawnJob,
    collectJob,

    JobLimit,
    newJobLimit,
    withJobLimit,

    Lock,
    newLock,
    criticalSection
  ) where

import Control.Monad
import Control.Concurrent hiding (QSem, newQSem, waitQSem, signalQSem)
import Control.Exception (SomeException, bracket_, mask, throw, try)
import Distribution.Client.Compat.Semaphore

data JobControl m a = JobControl {
       spawnJob    :: m a -> m (),
       collectJob  :: m a
     }


newSerialJobControl :: IO (JobControl IO a)
newSerialJobControl = do
    queue <- newChan
    return JobControl {
      spawnJob   = spawn queue,
      collectJob = collect queue
    }
  where
    spawn :: Chan (IO a) -> IO a -> IO ()
    spawn = writeChan

    collect :: Chan (IO a) -> IO a
    collect = join . readChan

newParallelJobControl :: IO (JobControl IO a)
newParallelJobControl = do
    resultVar <- newEmptyMVar
    return JobControl {
      spawnJob   = spawn resultVar,
      collectJob = collect resultVar
    }
  where
    spawn :: MVar (Either SomeException a) -> IO a -> IO ()
    spawn resultVar job =
      mask $ \restore ->
        forkIO (do res <- try (restore job)
                   putMVar resultVar res)
         >> return ()

    collect :: MVar (Either SomeException a) -> IO a
    collect resultVar =
      takeMVar resultVar >>= either throw return

data JobLimit = JobLimit QSem

newJobLimit :: Int -> IO JobLimit
newJobLimit n =
  fmap JobLimit (newQSem n)

withJobLimit :: JobLimit -> IO a -> IO a
withJobLimit (JobLimit sem) =
  bracket_ (waitQSem sem) (signalQSem sem)

newtype Lock = Lock (MVar ())

newLock :: IO Lock
newLock = fmap Lock $ newMVar ()

criticalSection :: Lock -> IO a -> IO a
criticalSection (Lock lck) act = bracket_ (takeMVar lck) (putMVar lck ()) act