File: BoundedChan.hs

package info (click to toggle)
haskell-boundedchan 1.0.3.0-13
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 60 kB
  • sloc: haskell: 91; makefile: 2
file content (147 lines) | stat: -rw-r--r-- 5,981 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
-- |Implements bounded channels. These channels differ from normal 'Chan's in
-- that they are guaranteed to contain no more than a certain number of
-- elements. This is ideal when you may be writing to a channel faster than you
-- are able to read from it.
--
-- This module supports all the functions of "Control.Concurrent.Chan" except
-- 'unGetChan' and 'dupChan', which are not supported for bounded channels.
--
-- Extra consitency: This version enforces that if thread Alice writes
-- e1 followed by e2 then e1 will be returned by readChan before e2.
-- Conversely, if thead Bob reads e1 followed by e2 then it was true that
-- writeChan e1 preceded writeChan e2.
--
-- Previous versions did not enforce this consistency: if writeChan were
-- preempted between putMVars or killThread arrived between putMVars then it
-- can fail.  Similarly it might fail if readChan were stopped after putMVar
-- and before the second takeMVar.  An unlucky pattern of several such deaths
-- might actually break the invariants of the array in an unrecoverable way
-- causing all future reads and writes to block.
module Control.Concurrent.BoundedChan(
         BoundedChan
       , newBoundedChan
       , writeChan
       , tryWriteChan
       , readChan
       , tryReadChan
       , isEmptyChan
       , getChanContents
       , writeList2Chan
       )
  where

import Control.Concurrent.MVar (MVar, isEmptyMVar, newEmptyMVar, newMVar,
                                putMVar, tryPutMVar, takeMVar, tryTakeMVar)
import Control.Exception       (mask_, onException)
import Control.Monad           (replicateM)
import Data.Array              (Array, (!), listArray)
import System.IO.Unsafe        (unsafeInterleaveIO)

-- |'BoundedChan' is an abstract data type representing a bounded channel.
data BoundedChan a = BC {
       _size     :: Int
     , _contents :: Array Int (MVar a)
     , _writePos :: MVar Int
     , _readPos  :: MVar Int
     }

-- Versions of modifyMVar and withMVar that do not 'restore' the previous mask state when running
-- 'io', with added modification strictness.  The lack of 'restore' may make these perform better
-- than the normal version.  Moving strictness here makes using them more pleasant.
{-# INLINE modifyMVar_mask #-}
modifyMVar_mask :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVar_mask m io =
  mask_ $ do
    a <- takeMVar m
    (a',b) <- io a `onException` putMVar m a
    putMVar m $! a'
    return b

{-# INLINE modifyMVar_mask_ #-}
modifyMVar_mask_ :: MVar a -> (a -> IO a) -> IO ()
modifyMVar_mask_ m io =
  mask_ $ do
    a <- takeMVar m
    a' <- io a `onException` putMVar m a
    putMVar m $! a'

{-# INLINE withMVar_mask #-}
withMVar_mask :: MVar a -> (a -> IO b) -> IO b
withMVar_mask m io =
  mask_ $ do
    a <- takeMVar m
    b <- io a `onException` putMVar m a
    putMVar m a
    return b

-- |@newBoundedChan n@ returns a channel than can contain no more than @n@
-- elements.
newBoundedChan :: Int -> IO (BoundedChan a)
newBoundedChan x = do
  entls <- replicateM x newEmptyMVar
  wpos  <- newMVar 0
  rpos  <- newMVar 0
  let entries = listArray (0, x - 1) entls
  return (BC x entries wpos rpos)

-- |Write an element to the channel. If the channel is full, this routine will
-- block until it is able to write.  Blockers wait in a fair FIFO queue.
writeChan :: BoundedChan a -> a -> IO ()
writeChan (BC size contents wposMV _) x = modifyMVar_mask_ wposMV $
  \wpos -> do
    putMVar (contents ! wpos) x
    return ((succ wpos) `mod` size) -- only advance when putMVar succeeds

-- |A variant of 'writeChan' which, instead of blocking when the channel is
-- full, simply aborts and does not write the element. Note that this routine
-- can still block while waiting for write access to the channel.
tryWriteChan :: BoundedChan a -> a -> IO Bool
tryWriteChan (BC size contents wposMV _) x = modifyMVar_mask wposMV $
  \wpos -> do
    success <- tryPutMVar (contents ! wpos) x
    return $ if success
      then ((succ wpos) `mod` size, True) -- only advance when putMVar succeeds
      else (wpos, False)

-- |Read an element from the channel. If the channel is empty, this routine
-- will block until it is able to read.  Blockers wait in a fair FIFO queue.
readChan :: BoundedChan a -> IO a
readChan (BC size contents _ rposMV) = modifyMVar_mask rposMV $
  \rpos -> do
    a <- takeMVar (contents ! rpos)
    return ((succ rpos) `mod` size, a) -- only advance when takeMVar succeeds

-- |A variant of 'readChan' which, instead of blocking when the channel is
-- empty, immediately returns 'Nothing'. Otherwise, 'tryReadChan' returns
-- @'Just' a@ where @a@ is the element read from the channel. Note that this
-- routine can still block while waiting for read access to the channel.
tryReadChan :: BoundedChan a -> IO (Maybe a)
tryReadChan (BC size contents _ rposMV) = modifyMVar_mask rposMV $
  \rpos -> do
    ma <- tryTakeMVar (contents ! rpos)
    return $ case ma of
      Just a -> ((succ rpos) `mod` size, Just a) -- only advance when takeMVar succeeds
      Nothing -> (rpos, Nothing)

-- |DANGER: This may block on an empty channel if there is already a blocked reader.
-- Returns 'True' if the supplied channel is empty.
--
-- DEPRECATED
{-# DEPRECATED isEmptyChan "This isEmptyChan can block, no non-blocking substitute yet" #-}
isEmptyChan :: BoundedChan a -> IO Bool
isEmptyChan (BC _ contents _ rposMV) = withMVar_mask rposMV $
  \rpos -> isEmptyMVar (contents ! rpos)

-- |Return a lazy list representing the contents of the supplied channel.  Competing
-- readers might steal from this list.
getChanContents :: BoundedChan a -> IO [a]
getChanContents ch = unsafeInterleaveIO $ do
  x  <- readChan ch
  xs <- getChanContents ch
  return (x:xs)

-- |Write a list of elements to the channel. If the channel becomes full, this
-- routine will block until it is able to write.  Competing writers may interleave with
-- this one.
writeList2Chan :: BoundedChan a -> [a] -> IO ()
writeList2Chan = mapM_ . writeChan