1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
{-# LANGUAGE CPP, DeriveDataTypeable, NoImplicitPrelude #-}
#if __GLASGOW_HASKELL__ >= 704
{-# LANGUAGE Safe #-}
#endif
-------------------------------------------------------------------------------
-- |
-- Module : Control.Concurrent.Broadcast
-- Copyright : (c) 2010-2011 Bas van Dijk & Roel van Dijk
-- License : BSD3 (see the file LICENSE)
-- Maintainer : Bas van Dijk <v.dijk.bas@gmail.com>
-- , Roel van Dijk <vandijk.roel@gmail.com>
--
-- A 'Broadcast' is a mechanism for communication between threads. Multiple
-- @'listen'ers@ wait until a broadcaster @'broadcast's@ a value. The listeners
-- block until the value is received. When the broadcaster broadcasts a value
-- all listeners are woken.
--
-- All functions are /exception safe/. Throwing asynchronous exceptions will not
-- compromise the internal state of a broadcast.
--
-- This module is designed to be imported qualified. We suggest importing it
-- like:
--
-- @
-- import Control.Concurrent.Broadcast ( Broadcast )
-- import qualified Control.Concurrent.Broadcast as Broadcast ( ... )
-- @
-------------------------------------------------------------------------------
module Control.Concurrent.Broadcast
( Broadcast
-- * Creating broadcasts
, new
, newBroadcasting
-- * Listening to broadcasts
, listen
, tryListen
, listenTimeout
-- * Broadcasting
, broadcast
, signal
, silence
) where
-------------------------------------------------------------------------------
-- Imports
-------------------------------------------------------------------------------
-- from base:
import Control.Monad ( return, when )
import Control.Concurrent.MVar ( MVar, newMVar, newEmptyMVar
, takeMVar, putMVar, readMVar, modifyMVar_
)
import Control.Exception ( onException )
import Data.Eq ( Eq )
import Data.Either ( Either(Left ,Right), either )
import Data.Function ( ($), (.), const )
import Data.Functor ( fmap, (<$>) )
import Data.Foldable ( for_ )
import Data.List ( delete, length )
import Data.Maybe ( Maybe(Nothing, Just), isNothing )
import Data.Ord ( max )
import Data.Typeable ( Typeable )
import Prelude ( Integer, seq )
import System.IO ( IO )
#if __GLASGOW_HASKELL__ < 700
import Prelude ( fromInteger )
import Control.Monad ( (>>=), (>>), fail )
import Data.Ord ( Ord )
#endif
-- from unbounded-delays:
import Control.Concurrent.Timeout ( timeout )
-- from concurrent-extra (this package):
import Utils ( purelyModifyMVar, mask_ )
-------------------------------------------------------------------------------
-- Broadcast
-------------------------------------------------------------------------------
{-|
A broadcast is in one of two possible states:
* \"Silent\": @'listen'ing@ to the broadcast will block until a value is
@'broadcast'ed@.
* \"Broadcasting @x@\": @'listen'ing@ to the broadcast will return the value @x@
without blocking.
-}
newtype Broadcast a = Broadcast {unBroadcast :: MVar (Either [MVar a] a)}
deriving (Eq, Typeable)
-- | @new@ creates a broadcast in the \"silent\" state.
new :: IO (Broadcast a)
new = Broadcast <$> newMVar (Left [])
-- | @newBroadcasting x@ creates a broadcast in the \"broadcasting @x@\" state.
newBroadcasting :: a -> IO (Broadcast a)
newBroadcasting x = Broadcast <$> newMVar (Right x)
{-|
Listen to a broadcast.
* If the broadcast is \"broadcasting @x@\", @listen@ will return @x@
immediately.
* If the broadcast is \"silent\", @listen@ will block until another thread
@'broadcast's@ a value to the broadcast.
-}
listen :: Broadcast a -> IO a
listen (Broadcast mv) = mask_ $ do
mx <- takeMVar mv
case mx of
Left ls -> do l <- newEmptyMVar
putMVar mv $ Left $ l:ls
takeMVar l
Right x -> do putMVar mv mx
return x
{-|
Try to listen to a broadcast; non blocking.
* If the broadcast is \"broadcasting @x@\", @tryListen@ will return 'Just' @x@
immediately.
* If the broadcast is \"silent\", @tryListen@ returns 'Nothing' immediately.
-}
tryListen :: Broadcast a -> IO (Maybe a)
tryListen = fmap (either (const Nothing) Just) . readMVar . unBroadcast
{-|
Listen to a broadcast if it is available within a given amount of time.
Like 'listen', but with a timeout. A return value of 'Nothing' indicates a
timeout occurred.
The timeout is specified in microseconds.
If the broadcast is \"silent\" and a timeout of 0 μs is specified the
function returns 'Nothing' without blocking.
Negative timeouts are treated the same as a timeout of 0 μs.
-}
listenTimeout :: Broadcast a -> Integer -> IO (Maybe a)
listenTimeout (Broadcast mv) time = mask_ $ do
mx <- takeMVar mv
case mx of
Left ls -> do l <- newEmptyMVar
putMVar mv $ Left $ l:ls
my <- timeout (max time 0) (takeMVar l)
`onException` deleteReader l
when (isNothing my) (deleteReader l)
return my
Right x -> do putMVar mv mx
return $ Just x
where
deleteReader l = do mx <- takeMVar mv
case mx of
Left ls -> let ls' = delete l ls
in length ls' `seq` putMVar mv (Left ls')
Right _ -> putMVar mv mx
{-|
Broadcast a value.
@broadcast b x@ changes the state of the broadcast @b@ to \"broadcasting @x@\".
If the broadcast was \"silent\" all threads that are @'listen'ing@ to the
broadcast will be woken.
-}
broadcast :: Broadcast a -> a -> IO ()
{-|
Broadcast a value before becoming \"silent\".
The state of the broadcast is changed to \"silent\" after all threads that are
@'listen'ing@ to the broadcast are woken and resume with the signalled value.
The semantics of signal are equivalent to the following definition:
@
signal b x = 'block' $ 'broadcast' b x >> 'silence' b
@
-}
signal :: Broadcast a -> a -> IO ()
broadcast b x = broadcastThen (Right x) b x
signal b x = broadcastThen (Left []) b x
-- | Internally used function that performs the actual broadcast in 'broadcast'
-- and 'signal' then changes to the given final state.
broadcastThen :: Either [MVar a] a -> Broadcast a -> a -> IO ()
broadcastThen finalState (Broadcast mv) x =
modifyMVar_ mv $ \mx -> do
case mx of
Left ls -> do for_ ls (`putMVar` x)
return finalState
Right _ -> return finalState
-- | Set a broadcast to the \"silent\" state.
silence :: Broadcast a -> IO ()
silence (Broadcast mv) = purelyModifyMVar mv $ either Left $ const $ Left []
|