File: Semaphore.hs

package info (click to toggle)
haskell-cabal-install 1.20.0.3-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 1,324 kB
  • ctags: 10
  • sloc: haskell: 18,563; sh: 225; ansic: 36; makefile: 6
file content (104 lines) | stat: -rw-r--r-- 2,965 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
{-# LANGUAGE DeriveDataTypeable #-}
{-# OPTIONS_GHC -funbox-strict-fields #-}
module Distribution.Client.Compat.Semaphore
    ( QSem
    , newQSem
    , waitQSem
    , signalQSem
    ) where

import Control.Concurrent.STM (TVar, atomically, newTVar, readTVar, retry,
                               writeTVar)
import Control.Exception (mask_, onException)
import Control.Monad (join, when)
import Data.Typeable (Typeable)

-- | 'QSem' is a quantity semaphore in which the resource is aqcuired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSem` calls.
--
data QSem = QSem !(TVar Int) !(TVar [TVar Bool]) !(TVar [TVar Bool])
  deriving (Eq, Typeable)

newQSem :: Int -> IO QSem
newQSem i = atomically $ do
  q <- newTVar i
  b1 <- newTVar []
  b2 <- newTVar []
  return (QSem q b1 b2)

waitQSem :: QSem -> IO ()
waitQSem s@(QSem q _b1 b2) =
  mask_ $ join $ atomically $ do
        -- join, because if we need to block, we have to add a TVar to
        -- the block queue.
        -- mask_, because we need a chance to set up an exception handler
        -- after the join returns.
     v <- readTVar q
     if v == 0
        then do b <- newTVar False
                ys <- readTVar b2
                writeTVar b2 (b:ys)
                return (wait b)
        else do writeTVar q $! v - 1
                return (return ())
  where
    --
    -- very careful here: if we receive an exception, then we need to
    --  (a) write True into the TVar, so that another signalQSem doesn't
    --      try to wake up this thread, and
    --  (b) if the TVar is *already* True, then we need to do another
    --      signalQSem to avoid losing a unit of the resource.
    --
    -- The 'wake' function does both (a) and (b), so we can just call
    -- it here.
    --
    wait t =
      flip onException (wake s t) $
      atomically $ do
        b <- readTVar t
        when (not b) retry


wake :: QSem -> TVar Bool -> IO ()
wake s x = join $ atomically $ do
      b <- readTVar x
      if b then return (signalQSem s)
           else do writeTVar x True
                   return (return ())

{-
 property we want:

   bracket waitQSem (\_ -> signalQSem) (\_ -> ...)

 never loses a unit of the resource.
-}

signalQSem :: QSem -> IO ()
signalQSem s@(QSem q b1 b2) =
  mask_ $ join $ atomically $ do
      -- join, so we don't force the reverse inside the txn
      -- mask_ is needed so we don't lose a wakeup
    v <- readTVar q
    if v /= 0
       then do writeTVar q $! v + 1
               return (return ())
       else do xs <- readTVar b1
               checkwake1 xs
  where
    checkwake1 [] = do
      ys <- readTVar b2
      checkwake2 ys
    checkwake1 (x:xs) = do
      writeTVar b1 xs
      return (wake s x)

    checkwake2 [] = do
      writeTVar q 1
      return (return ())
    checkwake2 ys = do
      let (z:zs) = reverse ys
      writeTVar b1 zs
      writeTVar b2 []
      return (wake s z)