File: Thread.hs

package info (click to toggle)
haskell-auto-update 0.2.6-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 144 kB
  • sloc: haskell: 677; makefile: 2
file content (133 lines) | stat: -rw-r--r-- 5,459 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
module Control.AutoUpdate.Thread (
    -- * Creation
    mkAutoUpdate,
    mkAutoUpdateWithModify,
) where

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (
    newEmptyMVar,
    putMVar,
    readMVar,
    takeMVar,
    tryPutMVar,
 )
import Control.Exception (
    SomeException,
    catch,
    mask_,
    throw,
    try,
 )
import Control.Monad (void)
import Data.IORef (newIORef, readIORef, writeIORef)
import Data.Maybe (fromMaybe)
import GHC.Conc.Sync (labelThread)

import Control.AutoUpdate.Types

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread.
--
-- @since 0.1.0
mkAutoUpdate :: UpdateSettings a -> IO (IO a)
mkAutoUpdate us = mkAutoUpdateHelper us Nothing

-- | Generate an action which will either read from an automatically
-- updated value, or run the update action in the current thread if
-- the first time or the provided modify action after that.
--
-- @since 0.1.4
mkAutoUpdateWithModify :: UpdateSettings a -> (a -> IO a) -> IO (IO a)
mkAutoUpdateWithModify us f = mkAutoUpdateHelper us (Just f)

mkAutoUpdateHelper :: UpdateSettings a -> Maybe (a -> IO a) -> IO (IO a)
mkAutoUpdateHelper us updateActionModify = do
    -- A baton to tell the worker thread to generate a new value.
    needsRunning <- newEmptyMVar

    -- The initial response variable. Response variables allow the requesting
    -- thread to block until a value is generated by the worker thread.
    responseVar0 <- newEmptyMVar

    -- The current value, if available. We start off with a Left value
    -- indicating no value is available, and the above-created responseVar0 to
    -- give a variable to block on.
    currRef <- newIORef $ Left responseVar0

    -- This is used to set a value in the currRef variable when the worker
    -- thread exits. In reality, that value should never be used, since the
    -- worker thread exiting only occurs if an async exception is thrown, which
    -- should only occur if there are no references to needsRunning left.
    -- However, this handler will make error messages much clearer if there's a
    -- bug in the implementation.
    let fillRefOnExit f = do
            eres <- try f
            case eres of
                Left e ->
                    writeIORef currRef $
                        error $
                            "Control.AutoUpdate.mkAutoUpdate: worker thread exited with exception: "
                                ++ show (e :: SomeException)
                Right () ->
                    writeIORef currRef $
                        error $
                            "Control.AutoUpdate.mkAutoUpdate: worker thread exited normally, "
                                ++ "which should be impossible due to usage of infinite loop"

    -- fork the worker thread immediately. Note that we mask async exceptions,
    -- but *not* in an uninterruptible manner. This will allow a
    -- BlockedIndefinitelyOnMVar exception to still be thrown, which will take
    -- down this thread when all references to the returned function are
    -- garbage collected, and therefore there is no thread that can fill the
    -- needsRunning MVar.
    --
    -- Note that since we throw away the ThreadId of this new thread and never
    -- calls myThreadId, normal async exceptions can never be thrown to it,
    -- only RTS exceptions.
    tid <- mask_ $ forkIO $ fillRefOnExit $ do
        -- This infinite loop makes up out worker thread. It takes an a
        -- responseVar value where the next value should be putMVar'ed to for
        -- the benefit of any requesters currently blocked on it.
        let loop responseVar maybea = do
                -- block until a value is actually needed
                takeMVar needsRunning

                -- new value requested, so run the updateAction
                a <- catchSome $ fromMaybe (updateAction us) (updateActionModify <*> maybea)

                -- we got a new value, update currRef and lastValue
                writeIORef currRef $ Right a
                putMVar responseVar a

                -- delay until we're needed again
                threadDelay $ updateFreq us

                -- delay's over. create a new response variable and set currRef
                -- to use it, so that the next requester will block on that
                -- variable. Then loop again with the updated response
                -- variable.
                responseVar' <- newEmptyMVar
                writeIORef currRef $ Left responseVar'
                loop responseVar' (Just a)

        -- Kick off the loop, with the initial responseVar0 variable.
        loop responseVar0 Nothing
    labelThread tid $ updateThreadName us
    return $ do
        mval <- readIORef currRef
        case mval of
            Left responseVar -> do
                -- no current value, force the worker thread to run...
                void $ tryPutMVar needsRunning ()

                -- and block for the result from the worker
                readMVar responseVar
            -- we have a current value, use it
            Right val -> return val

-- | Turn a runtime exception into an impure exception, so that all 'IO'
-- actions will complete successfully. This simply defers the exception until
-- the value is forced.
catchSome :: IO a -> IO a
catchSome act = Control.Exception.catch act $ \e -> return $ throw (e :: SomeException)