1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
|
{-# LANGUAGE CPP #-}
-- |
-- Module: Control.Concurrent.STM.Delay
-- Copyright: (c) Joseph Adams 2012
-- License: BSD3
-- Maintainer: joeyadams3.14159@gmail.com
-- Portability: Requires GHC 7+
--
-- One-shot timer whose duration can be updated. Think of it as an enhanced
-- version of 'registerDelay'.
--
-- This uses "GHC.Event" when available (GHC 7.2+, @-threaded@, non-Windows OS).
-- Otherwise, it falls back to forked threads and 'threadDelay'.
module Control.Concurrent.STM.Delay (
-- * Managing delays
Delay,
newDelay,
updateDelay,
cancelDelay,
-- * Waiting for expiration
waitDelay,
tryWaitDelay,
tryWaitDelayIO,
-- * Example
-- $example
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception (mask_)
import Control.Monad
#if MIN_VERSION_base(4,4,0) && !mingw32_HOST_OS
import qualified GHC.Event as Ev
#endif
#if MIN_VERSION_base(4,7,0) && !mingw32_HOST_OS
import qualified GHC.Conc as Conc
#endif
-- | A 'Delay' is an updatable timer that rings only once.
data Delay = Delay
{ delayVar :: !(TVar Bool)
, delayUpdate :: !(Int -> IO ())
, delayCancel :: !(IO ())
}
instance Eq Delay where
(==) a b = delayVar a == delayVar b
-- | Create a new 'Delay' that will ring in the given number of microseconds.
newDelay :: Int -> IO Delay
newDelay t
| t > 0 = getDelayImpl t
-- Special case zero timeout, so user can create an
-- already-rung 'Delay' efficiently.
| otherwise = do
var <- newTVarIO True
return Delay
{ delayVar = var
, delayUpdate = \_t -> return ()
, delayCancel = return ()
}
-- | Set an existing 'Delay' to ring in the given number of microseconds
-- (from the time 'updateDelay' is called), rather than when it was going to
-- ring. If the 'Delay' has already rung, do nothing.
updateDelay :: Delay -> Int -> IO ()
updateDelay = delayUpdate
-- | Set a 'Delay' so it will never ring, even if 'updateDelay' is used later.
-- If the 'Delay' has already rung, do nothing.
cancelDelay :: Delay -> IO ()
cancelDelay = delayCancel
-- | Block until the 'Delay' rings. If the 'Delay' has already rung,
-- return immediately.
waitDelay :: Delay -> STM ()
waitDelay delay = do
expired <- tryWaitDelay delay
if expired then return ()
else retry
-- | Non-blocking version of 'waitDelay'.
-- Return 'True' if the 'Delay' has rung.
tryWaitDelay :: Delay -> STM Bool
tryWaitDelay = readTVar . delayVar
-- | Faster version of @'atomically' . 'tryWaitDelay'@. See 'readTVarIO'.
--
-- Since 0.1.1
tryWaitDelayIO :: Delay -> IO Bool
tryWaitDelayIO = readTVarIO . delayVar
------------------------------------------------------------------------
-- Drivers
getDelayImpl :: Int -> IO Delay
#if MIN_VERSION_base(4,7,0) && !mingw32_HOST_OS
getDelayImpl t0 = do
Conc.ensureIOManagerIsRunning
m <- Ev.getSystemEventManager
case m of
Nothing -> implThread t0
Just _ -> do
mgr <- Ev.getSystemTimerManager
implEvent mgr t0
#elif MIN_VERSION_base(4,4,0) && !mingw32_HOST_OS
getDelayImpl t0 = do
m <- Ev.getSystemEventManager
case m of
Nothing -> implThread t0
Just mgr -> implEvent mgr t0
#else
getDelayImpl = implThread
#endif
#if MIN_VERSION_base(4,7,0) && !mingw32_HOST_OS
-- | Use the timeout API in "GHC.Event" via TimerManager
--implEvent :: Ev.TimerManager -> Int -> IO Delay
implEvent mgr t0 = do
var <- newTVarIO False
k <- Ev.registerTimeout mgr t0 $ atomically $ writeTVar var True
return Delay
{ delayVar = var
, delayUpdate = Ev.updateTimeout mgr k
, delayCancel = Ev.unregisterTimeout mgr k
}
#elif MIN_VERSION_base(4,4,0) && !mingw32_HOST_OS
-- | Use the timeout API in "GHC.Event"
implEvent :: Ev.EventManager -> Int -> IO Delay
implEvent mgr t0 = do
var <- newTVarIO False
k <- Ev.registerTimeout mgr t0 $ atomically $ writeTVar var True
return Delay
{ delayVar = var
, delayUpdate = Ev.updateTimeout mgr k
, delayCancel = Ev.unregisterTimeout mgr k
}
#endif
-- | Use threads and threadDelay:
--
-- [init]
-- Fork a thread to wait the given length of time, then set the TVar.
--
-- [delayUpdate]
-- Stop the existing thread and (unless the delay has been canceled)
-- fork a new thread.
--
-- [delayCancel]
-- Stop the existing thread, if any.
implThread :: Int -> IO Delay
implThread t0 = do
var <- newTVarIO False
let new t = forkTimeoutThread t $ atomically $ writeTVar var True
mv <- new t0 >>= newMVar . Just
return Delay
{ delayVar = var
, delayUpdate = replaceThread mv . fmap Just . new
, delayCancel = replaceThread mv $ return Nothing
}
replaceThread :: MVar (Maybe TimeoutThread)
-> IO (Maybe TimeoutThread)
-> IO ()
replaceThread mv new =
join $ mask_ $ do
m <- takeMVar mv
case m of
Nothing -> do
-- Don't create a new timer thread after the 'Delay' has
-- been canceled. Otherwise, the behavior is inconsistent
-- with GHC.Event.
putMVar mv Nothing
return (return ())
Just tt -> do
m' <- stopTimeoutThread tt
case m' of
Nothing -> do
-- Timer already rang (or will ring very soon).
-- Don't start a new timer thread, as it would
-- waste resources and have no externally
-- observable effect.
putMVar mv Nothing
return $ return ()
Just kill -> do
new >>= putMVar mv
return kill
------------------------------------------------------------------------
-- TimeoutThread
data TimeoutThread = TimeoutThread !ThreadId !(MVar ())
-- | Fork a thread to perform an action after the given number of
-- microseconds.
--
-- 'forkTimeoutThread' is non-interruptible.
forkTimeoutThread :: Int -> IO () -> IO TimeoutThread
forkTimeoutThread t io = do
mv <- newMVar ()
tid <- compat_forkIOUnmasked $ do
threadDelay t
m <- tryTakeMVar mv
-- If m is Just, this thread will not be interrupted,
-- so no need for a 'mask' between the tryTakeMVar and the action.
case m of
Nothing -> return ()
Just _ -> io
return (TimeoutThread tid mv)
-- | Prevent the 'TimeoutThread' from performing its action. If it's too late,
-- return 'Nothing'. Otherwise, return an action (namely, 'killThread') for
-- cleaning up the underlying thread.
--
-- 'stopTimeoutThread' has a nice property: it is /non-interruptible/.
-- This means that, in an exception 'mask', it will not poll for exceptions.
-- See "Control.Exception" for more info.
--
-- However, the action returned by 'stopTimeoutThread' /does/ poll for
-- exceptions. That's why 'stopTimeoutThread' returns this action rather than
-- simply doing it. This lets the caller do it outside of a critical section.
stopTimeoutThread :: TimeoutThread -> IO (Maybe (IO ()))
stopTimeoutThread (TimeoutThread tid mv) =
maybe Nothing (\_ -> Just (killThread tid)) `fmap` tryTakeMVar mv
------------------------------------------------------------------------
-- Compatibility
compat_forkIOUnmasked :: IO () -> IO ThreadId
#if MIN_VERSION_base(4,4,0)
compat_forkIOUnmasked io = forkIOWithUnmask (\_ -> io)
#else
compat_forkIOUnmasked = forkIOUnmasked
#endif
------------------------------------------------------------------------
{- $example
Suppose we are managing a network connection, and want to time it out if no
messages are received in over five minutes. We'll create a 'Delay', and an
action to \"bump\" it:
@
let timeoutInterval = 5 * 60 * 1000000 :: 'Int'
delay <- 'newDelay' timeoutInterval
let bump = 'updateDelay' delay timeoutInterval
@
This way, the 'Delay' will ring if it is not bumped for longer than
five minutes.
Now we fork the receiver thread:
@
dead <- 'newEmptyTMVarIO'
_ <- 'forkIO' $
('forever' $ do
msg <- recvMessage
bump
handleMessage msg
) \`finally\` 'atomically' ('putTMVar' dead ())
@
Finally, we wait for the delay to ring, or for the receiver thread to fail due
to an exception:
@
'atomically' $ 'waitDelay' delay \`orElse\` 'readTMVar' dead
@
Warning:
* If /handleMessage/ blocks, the 'Delay' may ring due to @handleMessage@
taking too long, rather than just @recvMessage@ taking too long.
* The loop will continue to run until you do something to stop it.
It might be simpler to use "System.Timeout" instead:
@
m <- 'System.Timeout.timeout' timeoutInterval recvMessage
case m of
Nothing -> 'fail' \"timed out\"
Just msg -> handleMessage msg
@
However, using a 'Delay' has the following advantages:
* If @recvMessage@ makes a blocking FFI call (e.g. network I/O on Windows),
'System.Timeout.timeout' won't work, since it uses an asynchronous
exception, and FFI calls can't be interrupted with async exceptions.
The 'Delay' approach lets you handle the timeout in another thread,
while the FFI call is still blocked.
* 'updateDelay' is more efficient than 'System.Timeout.timeout' when
"GHC.Event" is available.
-}
|