1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
{-# LANGUAGE Safe #-}
{-# LANGUAGE BangPatterns #-}
{-# OPTIONS_GHC -funbox-strict-fields #-}
-----------------------------------------------------------------------------
-- |
-- Module : Control.Concurrent.QSem
-- Copyright : (c) The University of Glasgow 2001
-- License : BSD-style (see the file libraries/base/LICENSE)
--
-- Maintainer : libraries@haskell.org
-- Stability : experimental
-- Portability : non-portable (concurrency)
--
-- Simple quantity semaphores.
--
-----------------------------------------------------------------------------
module Control.Concurrent.QSem
( -- * Simple Quantity Semaphores
QSem, -- abstract
newQSem, -- :: Int -> IO QSem
waitQSem, -- :: QSem -> IO ()
signalQSem -- :: QSem -> IO ()
) where
import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
, putMVar, newMVar, tryPutMVar)
import Control.Exception
import Data.Maybe
-- | 'QSem' is a quantity semaphore in which the resource is aqcuired
-- and released in units of one. It provides guaranteed FIFO ordering
-- for satisfying blocked `waitQSem` calls.
--
-- The pattern
--
-- > bracket_ waitQSem signalQSem (...)
--
-- is safe; it never loses a unit of the resource.
--
data QSem = QSem !(MVar (Int, [MVar ()], [MVar ()]))
-- The semaphore state (i, xs, ys):
--
-- i is the current resource value
--
-- (xs,ys) is the queue of blocked threads, where the queue is
-- given by xs ++ reverse ys. We can enqueue new blocked threads
-- by consing onto ys, and dequeue by removing from the head of xs.
--
-- A blocked thread is represented by an empty (MVar ()). To unblock
-- the thread, we put () into the MVar.
--
-- A thread can dequeue itself by also putting () into the MVar, which
-- it must do if it receives an exception while blocked in waitQSem.
-- This means that when unblocking a thread in signalQSem we must
-- first check whether the MVar is already full; the MVar lock on the
-- semaphore itself resolves race conditions between signalQSem and a
-- thread attempting to dequeue itself.
-- |Build a new 'QSem' with a supplied initial quantity.
-- The initial quantity must be at least 0.
newQSem :: Int -> IO QSem
newQSem initial
| initial < 0 = fail "newQSem: Initial quantity must be non-negative"
| otherwise = do
sem <- newMVar (initial, [], [])
return (QSem sem)
-- |Wait for a unit to become available
waitQSem :: QSem -> IO ()
waitQSem (QSem m) =
mask_ $ do
(i,b1,b2) <- takeMVar m
if i == 0
then do
b <- newEmptyMVar
putMVar m (i, b1, b:b2)
wait b
else do
let !z = i-1
putMVar m (z, b1, b2)
return ()
where
wait b = takeMVar b `onException` do
(uninterruptibleMask_ $ do -- Note [signal uninterruptible]
(i,b1,b2) <- takeMVar m
r <- tryTakeMVar b
r' <- if isJust r
then signal (i,b1,b2)
else do putMVar b (); return (i,b1,b2)
putMVar m r')
-- |Signal that a unit of the 'QSem' is available
signalQSem :: QSem -> IO ()
signalQSem (QSem m) =
uninterruptibleMask_ $ do -- Note [signal uninterruptible]
r <- takeMVar m
r' <- signal r
putMVar m r'
-- Note [signal uninterruptible]
--
-- If we have
--
-- bracket waitQSem signalQSem (...)
--
-- and an exception arrives at the signalQSem, then we must not lose
-- the resource. The signalQSem is masked by bracket, but taking
-- the MVar might block, and so it would be interruptible. Hence we
-- need an uninterruptibleMask here.
--
-- This isn't ideal: during high contention, some threads won't be
-- interruptible. The QSemSTM implementation has better behaviour
-- here, but it performs much worse than this one in some
-- benchmarks.
signal :: (Int,[MVar ()],[MVar ()]) -> IO (Int,[MVar ()],[MVar ()])
signal (i,a1,a2) =
if i == 0
then loop a1 a2
else let !z = i+1 in return (z, a1, a2)
where
loop [] [] = return (1, [], [])
loop [] b2 = loop (reverse b2) []
loop (b:bs) b2 = do
r <- tryPutMVar b ()
if r then return (0, bs, b2)
else loop bs b2
|