File: Reaper.hs

package info (click to toggle)
haskell-auto-update 0.1.6-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 112 kB
  • sloc: haskell: 373; makefile: 2
file content (280 lines) | stat: -rw-r--r-- 10,168 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
{-# LANGUAGE RecordWildCards    #-}
{-# LANGUAGE BangPatterns       #-}

-- | This module provides the ability to create reapers: dedicated cleanup
-- threads. These threads will automatically spawn and die based on the
-- presence of a workload to process on. Example uses include:
--
-- * Killing long-running jobs
-- * Closing unused connections in a connection pool
-- * Pruning a cache of old items (see example below)
--
-- For real-world usage, search the <https://github.com/yesodweb/wai WAI family of packages>
-- for imports of "Control.Reaper".
module Control.Reaper (
      -- * Example: Regularly cleaning a cache
      -- $example1

      -- * Settings
      ReaperSettings
    , defaultReaperSettings
      -- * Accessors
    , reaperAction
    , reaperDelay
    , reaperCons
    , reaperNull
    , reaperEmpty
      -- * Type
    , Reaper(..)
      -- * Creation
    , mkReaper
      -- * Helper
    , mkListAction
    ) where

import Control.AutoUpdate.Util (atomicModifyIORef')
import Control.Concurrent (forkIO, threadDelay, killThread, ThreadId)
import Control.Exception (mask_)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)

-- | Settings for creating a reaper. This type has two parameters:
-- @workload@ gives the entire workload, whereas @item@ gives an
-- individual piece of the queue. A common approach is to have @workload@
-- be a list of @item@s. This is encouraged by 'defaultReaperSettings' and
-- 'mkListAction'.
--
-- @since 0.1.1
data ReaperSettings workload item = ReaperSettings
    { reaperAction :: workload -> IO (workload -> workload)
    -- ^ The action to perform on a workload. The result of this is a
    -- \"workload modifying\" function. In the common case of using lists,
    -- the result should be a difference list that prepends the remaining
    -- workload to the temporary workload. The temporary workload here
    -- refers to items added to the workload while the reaper action is
    -- running. For help with setting up such an action, see 'mkListAction'.
    --
    -- Default: do nothing with the workload, and then prepend it to the
    -- temporary workload. This is incredibly useless; you should
    -- definitely override this default.
    --
    -- @since 0.1.1
    , reaperDelay :: {-# UNPACK #-} !Int
    -- ^ Number of microseconds to delay between calls of 'reaperAction'.
    --
    -- Default: 30 seconds.
    --
    -- @since 0.1.1
    , reaperCons :: item -> workload -> workload
    -- ^ Add an item onto a workload.
    --
    -- Default: list consing.
    --
    -- @since 0.1.1
    , reaperNull :: workload -> Bool
    -- ^ Check if a workload is empty, in which case the worker thread
    -- will shut down.
    --
    -- Default: 'null'.
    --
    -- @since 0.1.1
    , reaperEmpty :: workload
    -- ^ An empty workload.
    --
    -- Default: empty list.
    --
    -- @since 0.1.1
    }

-- | Default @ReaperSettings@ value, biased towards having a list of work
-- items.
--
-- @since 0.1.1
defaultReaperSettings :: ReaperSettings [item] item
defaultReaperSettings = ReaperSettings
    { reaperAction = \wl -> return (wl ++)
    , reaperDelay = 30000000
    , reaperCons = (:)
    , reaperNull = null
    , reaperEmpty = []
    }

-- | A data structure to hold reaper APIs.
data Reaper workload item = Reaper {
    -- | Adding an item to the workload
    reaperAdd  :: item -> IO ()
    -- | Reading workload.
  , reaperRead :: IO workload
    -- | Stopping the reaper thread if exists.
    --   The current workload is returned.
  , reaperStop :: IO workload
    -- | Killing the reaper thread immediately if exists.
  , reaperKill :: IO ()
  }

-- | State of reaper.
data State workload = NoReaper           -- ^ No reaper thread
                    | Workload !workload  -- ^ The current jobs

-- | Create a reaper addition function. This function can be used to add
-- new items to the workload. Spawning of reaper threads will be handled
-- for you automatically.
--
-- @since 0.1.1
mkReaper :: ReaperSettings workload item -> IO (Reaper workload item)
mkReaper settings@ReaperSettings{..} = do
    stateRef <- newIORef NoReaper
    tidRef   <- newIORef Nothing
    return Reaper {
        reaperAdd  = add settings stateRef tidRef
      , reaperRead = readRef stateRef
      , reaperStop = stop stateRef
      , reaperKill = kill tidRef
      }
  where
    readRef stateRef = do
        mx <- readIORef stateRef
        case mx of
            NoReaper    -> return reaperEmpty
            Workload wl -> return wl
    stop stateRef = atomicModifyIORef' stateRef $ \mx ->
        case mx of
            NoReaper   -> (NoReaper, reaperEmpty)
            Workload x -> (Workload reaperEmpty, x)
    kill tidRef = do
        mtid <- readIORef tidRef
        case mtid of
            Nothing  -> return ()
            Just tid -> killThread tid

add :: ReaperSettings workload item
    -> IORef (State workload) -> IORef (Maybe ThreadId)
    -> item -> IO ()
add settings@ReaperSettings{..} stateRef tidRef item =
    mask_ $ do
      next <- atomicModifyIORef' stateRef cons
      next
  where
    cons NoReaper      = let wl = reaperCons item reaperEmpty
                         in (Workload wl, spawn settings stateRef tidRef)
    cons (Workload wl) = let wl' = reaperCons item wl
                         in (Workload wl', return ())

spawn :: ReaperSettings workload item
      -> IORef (State workload) -> IORef (Maybe ThreadId)
      -> IO ()
spawn settings stateRef tidRef = do
    tid <- forkIO $ reaper settings stateRef tidRef
    writeIORef tidRef $ Just tid

reaper :: ReaperSettings workload item
       -> IORef (State workload) -> IORef (Maybe ThreadId)
       -> IO ()
reaper settings@ReaperSettings{..} stateRef tidRef = do
    threadDelay reaperDelay
    -- Getting the current jobs. Push an empty job to the reference.
    wl <- atomicModifyIORef' stateRef swapWithEmpty
    -- Do the jobs. A function to merge the left jobs and
    -- new jobs is returned.
    !merge <- reaperAction wl
    -- Merging the left jobs and new jobs.
    -- If there is no jobs, this thread finishes.
    next <- atomicModifyIORef' stateRef (check merge)
    next
  where
    swapWithEmpty NoReaper      = error "Control.Reaper.reaper: unexpected NoReaper (1)"
    swapWithEmpty (Workload wl) = (Workload reaperEmpty, wl)

    check _ NoReaper   = error "Control.Reaper.reaper: unexpected NoReaper (2)"
    check merge (Workload wl)
      -- If there is no job, reaper is terminated.
      | reaperNull wl' = (NoReaper, writeIORef tidRef Nothing)
      -- If there are jobs, carry them out.
      | otherwise      = (Workload wl', reaper settings stateRef tidRef)
      where
        wl' = merge wl

-- | A helper function for creating 'reaperAction' functions. You would
-- provide this function with a function to process a single work item and
-- return either a new work item, or @Nothing@ if the work item is
-- expired.
--
-- @since 0.1.1
mkListAction :: (item -> IO (Maybe item'))
             -> [item]
             -> IO ([item'] -> [item'])
mkListAction f =
    go id
  where
    go !front [] = return front
    go !front (x:xs) = do
        my <- f x
        let front' =
                case my of
                    Nothing -> front
                    Just y  -> front . (y:)
        go front' xs

-- $example1
-- In this example code, we use a 'Data.Map.Strict.Map' to cache fibonacci numbers, and a 'Reaper' to prune the cache.
--
-- The @main@ function first creates a 'Reaper', with fields to initialize the
-- cache ('reaperEmpty'), add items to it ('reaperCons'), and prune it ('reaperAction').
-- The reaper will run every two seconds ('reaperDelay'), but will stop running while
-- 'reaperNull' is true.
--
-- @main@ then loops infinitely ('Control.Monad.forever'). Each second it calculates the fibonacci number
-- for a value between 30 and 34, first trying the cache ('reaperRead' and 'Data.Map.Strict.lookup'),
-- then falling back to manually calculating it (@fib@)
-- and updating the cache with the result ('reaperAdd')
--
-- @clean@ simply removes items cached for more than 10 seconds.
-- This function is where you would perform IO-related cleanup,
-- like killing threads or closing connections, if that was the purpose of your reaper.
--
-- @
-- module Main where
--
-- import "Data.Time" (UTCTime, getCurrentTime, diffUTCTime)
-- import "Control.Reaper"
-- import "Control.Concurrent" (threadDelay)
-- import "Data.Map.Strict" (Map)
-- import qualified "Data.Map.Strict" as Map
-- import "Control.Monad" (forever)
-- import "System.Random" (getStdRandom, randomR)
--
-- fib :: 'Int' -> 'Int'
-- fib 0 = 0
-- fib 1 = 1
-- fib n = fib (n-1) + fib (n-2)
--
-- type Cache = 'Data.Map.Strict.Map' 'Int' ('Int', 'Data.Time.Clock.UTCTime')
--
-- main :: IO ()
-- main = do
--   reaper <- 'mkReaper' 'defaultReaperSettings'
--     { 'reaperEmpty' = Map.'Data.Map.Strict.empty'
--     , 'reaperCons' = \\(k, v, time) workload -> Map.'Data.Map.Strict.insert' k (v, time) workload
--     , 'reaperAction' = clean
--     , 'reaperDelay' = 1000000 * 2 -- Clean every 2 seconds
--     , 'reaperNull' = Map.'Data.Map.Strict.null'
--     }
--   forever $ do
--     fibArg <- 'System.Random.getStdRandom' ('System.Random.randomR' (30,34))
--     cache <- 'reaperRead' reaper
--     let cachedResult = Map.'Data.Map.Strict.lookup' fibArg cache
--     case cachedResult of
--       'Just' (fibResult, _createdAt) -> 'putStrLn' $ "Found in cache: `fib " ++ 'show' fibArg ++ "` " ++ 'show' fibResult
--       'Nothing' -> do
--         let fibResult = fib fibArg
--         'putStrLn' $ "Calculating `fib " ++ 'show' fibArg ++ "` " ++ 'show' fibResult
--         time <- 'Data.Time.Clock.getCurrentTime'
--         ('reaperAdd' reaper) (fibArg, fibResult, time)
--     'threadDelay' 1000000 -- 1 second
--
-- -- Remove items > 10 seconds old
-- clean :: Cache -> IO (Cache -> Cache)
-- clean oldMap = do
--   currentTime <- 'Data.Time.Clock.getCurrentTime'
--   let pruned = Map.'Data.Map.Strict.filter' (\\(_, createdAt) -> currentTime \`diffUTCTime\` createdAt < 10.0) oldMap
--   return (\\newData -> Map.'Data.Map.Strict.union' pruned newData)
-- @