1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
|
-- |Implements bounded channels. These channels differ from normal 'Chan's in
-- that they are guaranteed to contain no more than a certain number of
-- elements. This is ideal when you may be writing to a channel faster than you
-- are able to read from it.
--
-- This module supports all the functions of "Control.Concurrent.Chan" except
-- 'unGetChan' and 'dupChan', which are not supported for bounded channels.
--
-- Extra consitency: This version enforces that if thread Alice writes
-- e1 followed by e2 then e1 will be returned by readChan before e2.
-- Conversely, if thead Bob reads e1 followed by e2 then it was true that
-- writeChan e1 preceded writeChan e2.
--
-- Previous versions did not enforce this consistency: if writeChan were
-- preempted between putMVars or killThread arrived between putMVars then it
-- can fail. Similarly it might fail if readChan were stopped after putMVar
-- and before the second takeMVar. An unlucky pattern of several such deaths
-- might actually break the invariants of the array in an unrecoverable way
-- causing all future reads and writes to block.
module Control.Concurrent.BoundedChan(
BoundedChan
, newBoundedChan
, writeChan
, tryWriteChan
, readChan
, tryReadChan
, isEmptyChan
, getChanContents
, writeList2Chan
)
where
import Control.Concurrent.MVar (MVar, isEmptyMVar, newEmptyMVar, newMVar,
putMVar, tryPutMVar, takeMVar, tryTakeMVar)
import Control.Exception (mask_, onException)
import Control.Monad (replicateM)
import Data.Array (Array, (!), listArray)
import System.IO.Unsafe (unsafeInterleaveIO)
-- |'BoundedChan' is an abstract data type representing a bounded channel.
data BoundedChan a = BC {
_size :: Int
, _contents :: Array Int (MVar a)
, _writePos :: MVar Int
, _readPos :: MVar Int
}
-- Versions of modifyMVar and withMVar that do not 'restore' the previous mask state when running
-- 'io', with added modification strictness. The lack of 'restore' may make these perform better
-- than the normal version. Moving strictness here makes using them more pleasant.
{-# INLINE modifyMVar_mask #-}
modifyMVar_mask :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar_mask m io =
mask_ $ do
a <- takeMVar m
(a',b) <- io a `onException` putMVar m a
putMVar m $! a'
return b
{-# INLINE modifyMVar_mask_ #-}
modifyMVar_mask_ :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_mask_ m io =
mask_ $ do
a <- takeMVar m
a' <- io a `onException` putMVar m a
putMVar m $! a'
{-# INLINE withMVar_mask #-}
withMVar_mask :: MVar a -> (a -> IO b) -> IO b
withMVar_mask m io =
mask_ $ do
a <- takeMVar m
b <- io a `onException` putMVar m a
putMVar m a
return b
-- |@newBoundedChan n@ returns a channel than can contain no more than @n@
-- elements.
newBoundedChan :: Int -> IO (BoundedChan a)
newBoundedChan x = do
entls <- replicateM x newEmptyMVar
wpos <- newMVar 0
rpos <- newMVar 0
let entries = listArray (0, x - 1) entls
return (BC x entries wpos rpos)
-- |Write an element to the channel. If the channel is full, this routine will
-- block until it is able to write. Blockers wait in a fair FIFO queue.
writeChan :: BoundedChan a -> a -> IO ()
writeChan (BC size contents wposMV _) x = modifyMVar_mask_ wposMV $
\wpos -> do
putMVar (contents ! wpos) x
return ((succ wpos) `mod` size) -- only advance when putMVar succeeds
-- |A variant of 'writeChan' which, instead of blocking when the channel is
-- full, simply aborts and does not write the element. Note that this routine
-- can still block while waiting for write access to the channel.
tryWriteChan :: BoundedChan a -> a -> IO Bool
tryWriteChan (BC size contents wposMV _) x = modifyMVar_mask wposMV $
\wpos -> do
success <- tryPutMVar (contents ! wpos) x
return $ if success
then ((succ wpos) `mod` size, True) -- only advance when putMVar succeeds
else (wpos, False)
-- |Read an element from the channel. If the channel is empty, this routine
-- will block until it is able to read. Blockers wait in a fair FIFO queue.
readChan :: BoundedChan a -> IO a
readChan (BC size contents _ rposMV) = modifyMVar_mask rposMV $
\rpos -> do
a <- takeMVar (contents ! rpos)
return ((succ rpos) `mod` size, a) -- only advance when takeMVar succeeds
-- |A variant of 'readChan' which, instead of blocking when the channel is
-- empty, immediately returns 'Nothing'. Otherwise, 'tryReadChan' returns
-- @'Just' a@ where @a@ is the element read from the channel. Note that this
-- routine can still block while waiting for read access to the channel.
tryReadChan :: BoundedChan a -> IO (Maybe a)
tryReadChan (BC size contents _ rposMV) = modifyMVar_mask rposMV $
\rpos -> do
ma <- tryTakeMVar (contents ! rpos)
return $ case ma of
Just a -> ((succ rpos) `mod` size, Just a) -- only advance when takeMVar succeeds
Nothing -> (rpos, Nothing)
-- |DANGER: This may block on an empty channel if there is already a blocked reader.
-- Returns 'True' if the supplied channel is empty.
--
-- DEPRECATED
{-# DEPRECATED isEmptyChan "This isEmptyChan can block, no non-blocking substitute yet" #-}
isEmptyChan :: BoundedChan a -> IO Bool
isEmptyChan (BC _ contents _ rposMV) = withMVar_mask rposMV $
\rpos -> isEmptyMVar (contents ! rpos)
-- |Return a lazy list representing the contents of the supplied channel. Competing
-- readers might steal from this list.
getChanContents :: BoundedChan a -> IO [a]
getChanContents ch = unsafeInterleaveIO $ do
x <- readChan ch
xs <- getChanContents ch
return (x:xs)
-- |Write a list of elements to the channel. If the channel becomes full, this
-- routine will block until it is able to write. Competing writers may interleave with
-- this one.
writeList2Chan :: BoundedChan a -> [a] -> IO ()
writeList2Chan = mapM_ . writeChan
|