File: Broadcast.hs

package info (click to toggle)
haskell-concurrent-extra 0.7.0.12-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 184 kB
  • sloc: haskell: 1,040; makefile: 6
file content (212 lines) | stat: -rw-r--r-- 6,931 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
{-# LANGUAGE CPP, DeriveDataTypeable, NoImplicitPrelude #-}

#if __GLASGOW_HASKELL__ >= 704
{-# LANGUAGE Safe #-}
#endif

-------------------------------------------------------------------------------
-- |
-- Module     : Control.Concurrent.Broadcast
-- Copyright  : (c) 2010-2011 Bas van Dijk & Roel van Dijk
-- License    : BSD3 (see the file LICENSE)
-- Maintainer : Bas van Dijk <v.dijk.bas@gmail.com>
--            , Roel van Dijk <vandijk.roel@gmail.com>
--
-- A 'Broadcast' is a mechanism for communication between threads. Multiple
-- @'listen'ers@ wait until a broadcaster @'broadcast's@ a value. The listeners
-- block until the value is received. When the broadcaster broadcasts a value
-- all listeners are woken.
--
-- All functions are /exception safe/. Throwing asynchronous exceptions will not
-- compromise the internal state of a broadcast.
--
-- This module is designed to be imported qualified. We suggest importing it
-- like:
--
-- @
-- import           Control.Concurrent.Broadcast              ( Broadcast )
-- import qualified Control.Concurrent.Broadcast as Broadcast ( ... )
-- @
-------------------------------------------------------------------------------

module Control.Concurrent.Broadcast
  ( Broadcast

    -- * Creating broadcasts
  , new
  , newBroadcasting

    -- * Listening to broadcasts
  , listen
  , tryListen
  , listenTimeout

    -- * Broadcasting
  , broadcast
  , signal
  , silence
  ) where


-------------------------------------------------------------------------------
-- Imports
-------------------------------------------------------------------------------

-- from base:
import Control.Monad              ( return, when )
import Control.Concurrent.MVar    ( MVar, newMVar, newEmptyMVar
                                  , takeMVar, putMVar, readMVar, modifyMVar_
                                  )
import Control.Exception          ( onException )
import Data.Eq                    ( Eq )
import Data.Either                ( Either(Left ,Right), either )
import Data.Function              ( ($), (.), const )
import Data.Functor               ( fmap, (<$>) )
import Data.Foldable              ( for_ )
import Data.List                  ( delete, length )
import Data.Maybe                 ( Maybe(Nothing, Just), isNothing )
import Data.Ord                   ( max )
import Data.Typeable              ( Typeable )
import Prelude                    ( Integer, seq )
import System.IO                  ( IO )

#if __GLASGOW_HASKELL__ < 700
import Prelude                    ( fromInteger )
import Control.Monad              ( (>>=), (>>), fail )
import Data.Ord                   ( Ord )
#endif

-- from unbounded-delays:
import Control.Concurrent.Timeout ( timeout )

-- from concurrent-extra (this package):
import Utils                      ( purelyModifyMVar, mask_ )



-------------------------------------------------------------------------------
-- Broadcast
-------------------------------------------------------------------------------

{-|
A broadcast is in one of two possible states:

* \"Silent\": @'listen'ing@ to the broadcast will block until a value is
@'broadcast'ed@.

* \"Broadcasting @x@\": @'listen'ing@ to the broadcast will return the value @x@
without blocking.
-}
newtype Broadcast a = Broadcast {unBroadcast :: MVar (Either [MVar a] a)}
    deriving (Eq, Typeable)

-- | @new@ creates a broadcast in the \"silent\" state.
new :: IO (Broadcast a)
new = Broadcast <$> newMVar (Left [])

-- | @newBroadcasting x@ creates a broadcast in the \"broadcasting @x@\" state.
newBroadcasting :: a -> IO (Broadcast a)
newBroadcasting x = Broadcast <$> newMVar (Right x)

{-|
Listen to a broadcast.

* If the broadcast is \"broadcasting @x@\", @listen@ will return @x@
immediately.

* If the broadcast is \"silent\", @listen@ will block until another thread
@'broadcast's@ a value to the broadcast.
-}
listen :: Broadcast a -> IO a
listen (Broadcast mv) = mask_ $ do
  mx <- takeMVar mv
  case mx of
    Left ls -> do l <- newEmptyMVar
                  putMVar mv $ Left $ l:ls
                  takeMVar l
    Right x -> do putMVar mv mx
                  return x

{-|
Try to listen to a broadcast; non blocking.

* If the broadcast is \"broadcasting @x@\", @tryListen@ will return 'Just' @x@
immediately.

* If the broadcast is \"silent\", @tryListen@ returns 'Nothing' immediately.
-}
tryListen :: Broadcast a -> IO (Maybe a)
tryListen = fmap (either (const Nothing) Just) . readMVar . unBroadcast

{-|
Listen to a broadcast if it is available within a given amount of time.

Like 'listen', but with a timeout. A return value of 'Nothing' indicates a
timeout occurred.

The timeout is specified in microseconds.

If the broadcast is \"silent\" and a timeout of 0 &#x3bc;s is specified the
function returns 'Nothing' without blocking.

Negative timeouts are treated the same as a timeout of 0 &#x3bc;s.
-}
listenTimeout :: Broadcast a -> Integer -> IO (Maybe a)
listenTimeout (Broadcast mv) time = mask_ $ do
  mx <- takeMVar mv
  case mx of
    Left ls -> do l <- newEmptyMVar
                  putMVar mv $ Left $ l:ls
                  my <- timeout (max time 0) (takeMVar l)
                         `onException` deleteReader l
                  when (isNothing my) (deleteReader l)
                  return my
    Right x -> do putMVar mv mx
                  return $ Just x
    where
      deleteReader l = do mx <- takeMVar mv
                          case mx of
                            Left ls -> let ls' = delete l ls
                                       in length ls' `seq` putMVar mv (Left ls')
                            Right _ -> putMVar mv mx

{-|
Broadcast a value.

@broadcast b x@ changes the state of the broadcast @b@ to \"broadcasting @x@\".

If the broadcast was \"silent\" all threads that are @'listen'ing@ to the
broadcast will be woken.
-}
broadcast :: Broadcast a -> a -> IO ()

{-|
Broadcast a value before becoming \"silent\".

The state of the broadcast is changed to \"silent\" after all threads that are
@'listen'ing@ to the broadcast are woken and resume with the signalled value.

The semantics of signal are equivalent to the following definition:

@
  signal b x = 'block' $ 'broadcast' b x >> 'silence' b
@
-}
signal :: Broadcast a -> a -> IO ()

broadcast b x = broadcastThen (Right x) b x
signal    b x = broadcastThen (Left []) b x

-- | Internally used function that performs the actual broadcast in 'broadcast'
-- and 'signal' then changes to the given final state.
broadcastThen :: Either [MVar a] a -> Broadcast a -> a -> IO ()
broadcastThen finalState (Broadcast mv) x =
    modifyMVar_ mv $ \mx -> do
      case mx of
        Left ls -> do for_ ls (`putMVar` x)
                      return finalState
        Right _ -> return finalState

-- | Set a broadcast to the \"silent\" state.
silence :: Broadcast a -> IO ()
silence (Broadcast mv) = purelyModifyMVar mv $ either Left $ const $ Left []