File: KeyedPool.hs

package info (click to toggle)
haskell-http-client 0.7.17-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 528 kB
  • sloc: haskell: 4,029; makefile: 3
file content (325 lines) | stat: -rw-r--r-- 12,490 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
{-# LANGUAGE DeriveFoldable #-}
{-# LANGUAGE ScopedTypeVariables #-}
-- | Similar to Data.Pool from resource-pool, but resources are
-- identified by some key. To clarify semantics of this module:
--
-- * The pool holds onto and tracks idle resources. Active resources
-- (those checked out via 'takeKeyedPool') are not tracked at all by
-- 'KeyedPool' itself.
--
-- * The pool limits the number of idle resources per key and the
-- total number of idle resources.
--
-- * There is no limit placed on /active/ resources. As such: there
-- will be no delay when calling 'takeKeyedPool': it will either use
-- an idle resource already present, or create a new one
-- immediately.
--
-- * Once the garbage collector cleans up the 'kpAlive' value, the
-- pool will be shut down, by placing a 'PoolClosed' into the
-- 'kpVar' and destroying all existing idle connection.
--
-- * A reaper thread will destroy unused idle resources regularly. It
-- will stop running once 'kpVar' contains a 'PoolClosed' value.
--
-- * 'takeKeyedPool' is async exception safe, but relies on the
-- /caller/ to ensure prompt cleanup. See its comment for more
-- information.
module Data.KeyedPool
    ( KeyedPool
    , createKeyedPool
    , takeKeyedPool
    , Managed
    , managedResource
    , managedReused
    , managedRelease
    , keepAlive
    , Reuse (..)
    , dummyManaged
    ) where

import Control.Concurrent (forkIOWithUnmask, threadDelay)
import Control.Concurrent.STM
import Control.Exception (mask_, catch, SomeException)
import Control.Monad (join, unless, void)
import Data.Map (Map)
import Data.Maybe (isJust)
import qualified Data.Map.Strict as Map
import Data.Time (UTCTime, getCurrentTime, addUTCTime)
import Data.IORef (IORef, newIORef, mkWeakIORef, readIORef)
import qualified Data.Foldable as F
import GHC.Conc (unsafeIOToSTM)
import System.IO.Unsafe (unsafePerformIO)

data KeyedPool key resource = KeyedPool
    { kpCreate :: !(key -> IO resource)
    , kpDestroy :: !(resource -> IO ())
    , kpMaxPerKey :: !Int
    , kpMaxTotal :: !Int
    , kpVar :: !(TVar (PoolMap key resource))
    , kpAlive :: !(IORef ())
    }

data PoolMap key resource
    = PoolClosed
    | PoolOpen
        -- Total number of resources in the pool
        {-# UNPACK #-} !Int
        !(Map key (PoolList resource))
    deriving F.Foldable

-- | A non-empty list which keeps track of its own length and when
-- each resource was created.
data PoolList a
    = One a {-# UNPACK #-} !UTCTime
    | Cons
        a

        -- size of the list from this point and on
        {-# UNPACK #-} !Int

        {-# UNPACK #-} !UTCTime
        !(PoolList a)
    deriving F.Foldable

plistToList :: PoolList a -> [(UTCTime, a)]
plistToList (One a t) = [(t, a)]
plistToList (Cons a _ t plist) = (t, a) : plistToList plist

plistFromList :: [(UTCTime, a)] -> Maybe (PoolList a)
plistFromList [] = Nothing
plistFromList [(t, a)] = Just (One a t)
plistFromList xs =
    Just . snd . go $ xs
  where
    go [] = error "plistFromList.go []"
    go [(t, a)] = (2, One a t)
    go ((t, a):rest) =
        let (i, rest') = go rest
            i' = i + 1
         in i' `seq` (i', Cons a i t rest')

-- | Create a new 'KeyedPool' which will automatically clean up after
-- itself when all referenced to the 'KeyedPool' are gone. It will
-- also fork a reaper thread to regularly kill off unused resource.
createKeyedPool
    :: Ord key
    => (key -> IO resource) -- ^ create a new resource
    -> (resource -> IO ())
       -- ^ Destroy a resource. Note that exceptions thrown by this will be
       -- silently discarded. If you want reporting, please install an
       -- exception handler yourself.
    -> Int -- ^ number of resources per key to allow in the pool
    -> Int -- ^ number of resources to allow in the pool across all keys
    -> (SomeException -> IO ()) -- ^ what to do if the reaper throws an exception
    -> IO (KeyedPool key resource)
createKeyedPool create destroy maxPerKey maxTotal onReaperException = do
    var <- newTVarIO $ PoolOpen 0 Map.empty

    -- We use a different IORef for the weak ref instead of the var
    -- above since the reaper thread will always be holding onto a
    -- reference.
    alive <- newIORef ()
    void $ mkWeakIORef alive $ destroyKeyedPool' destroy var

    -- Make sure to fork _after_ we've established the mkWeakIORef. If
    -- we did it the other way around, it would be possible for an
    -- async exception to happen before our destroyKeyedPool' handler
    -- was installed, and then reap would have to rely on detecting an
    -- STM deadlock before it could ever exit. This way, the reap
    -- function will only start running when we're guaranteed that
    -- cleanup will be triggered.

    -- Ensure that we have a normal masking state in the new thread.
    _ <- forkIOWithUnmask $ \restore -> keepRunning $ restore $ reap destroy var
    return KeyedPool
        { kpCreate = create
        , kpDestroy = destroy
        , kpMaxPerKey = maxPerKey
        , kpMaxTotal = maxTotal
        , kpVar = var
        , kpAlive = alive
        }
  where
    keepRunning action =
        loop
      where
        loop = action `catch` \e -> onReaperException e >> loop

-- | Make a 'KeyedPool' inactive and destroy all idle resources.
destroyKeyedPool' :: (resource -> IO ())
                  -> TVar (PoolMap key resource)
                  -> IO ()
destroyKeyedPool' destroy var = do
    m <- atomically $ swapTVar var PoolClosed
    F.mapM_ (ignoreExceptions . destroy) m

-- | Run a reaper thread, which will destroy old resources. It will
-- stop running once our pool switches to PoolClosed, which is handled
-- via the mkWeakIORef in the creation of the pool.
reap :: forall key resource.
        Ord key
     => (resource -> IO ())
     -> TVar (PoolMap key resource)
     -> IO ()
reap destroy var =
    loop
  where
    loop = do
        threadDelay (5 * 1000 * 1000)
        join $ atomically $ do
            m'' <- readTVar var
            case m'' of
                PoolClosed -> return (return ())
                PoolOpen idleCount m
                    | Map.null m -> retry
                    | otherwise -> do
                        (m', toDestroy) <- findStale idleCount m
                        writeTVar var m'
                        return $ do
                            mask_ (mapM_ (ignoreExceptions . destroy) toDestroy)
                            loop

    findStale :: Int
              -> Map key (PoolList resource)
              -> STM (PoolMap key resource, [resource])
    findStale idleCount m = do
        -- We want to make sure to get the time _after_ any delays
        -- occur due to the retry call above. Since getCurrentTime has
        -- no side effects outside of the STM block, this is a safe
        -- usage.
        now <- unsafeIOToSTM getCurrentTime
        let isNotStale time = 30 `addUTCTime` time >= now
        let findStale' toKeep toDestroy [] =
                (Map.fromList (toKeep []), toDestroy [])
            findStale' toKeep toDestroy ((key, plist):rest) =
                findStale' toKeep' toDestroy' rest
              where
                -- Note: By definition, the timestamps must be in
                -- descending order, so we don't need to traverse the
                -- whole list.
                (notStale, stale) = span (isNotStale . fst) $ plistToList plist
                toDestroy' = toDestroy . (map snd stale++)
                toKeep' =
                    case plistFromList notStale of
                        Nothing -> toKeep
                        Just x -> toKeep . ((key, x):)
        let (toKeep, toDestroy) = findStale' id id (Map.toList m)
        let idleCount' = idleCount - length toDestroy
        return (PoolOpen idleCount' toKeep, toDestroy)

-- | Check out a value from the 'KeyedPool' with the given key.
--
-- This function will internally call 'mask_' to ensure async safety,
-- and will return a value which uses weak references to ensure that
-- the value is cleaned up. However, if you want to ensure timely
-- resource cleanup, you should bracket this operation together with
-- 'managedRelease'.
takeKeyedPool :: Ord key => KeyedPool key resource -> key -> IO (Managed resource)
takeKeyedPool kp key = mask_ $ join $ atomically $ do
    (m, mresource) <- fmap go $ readTVar (kpVar kp)
    writeTVar (kpVar kp) $! m
    return $ do
        resource <- maybe (kpCreate kp key) return mresource
        alive <- newIORef ()
        isReleasedVar <- newTVarIO False

        let release action = mask_ $ do
                isReleased <- atomically $ swapTVar isReleasedVar True
                unless isReleased $
                    case action of
                        Reuse -> putResource kp key resource
                        DontReuse -> ignoreExceptions $ kpDestroy kp resource

        _ <- mkWeakIORef alive $ release DontReuse
        return Managed
            { _managedResource = resource
            , _managedReused = isJust mresource
            , _managedRelease = release
            , _managedAlive = alive
            }
  where
    go PoolClosed = (PoolClosed, Nothing)
    go pcOrig@(PoolOpen idleCount m) =
        case Map.lookup key m of
            Nothing -> (pcOrig, Nothing)
            Just (One a _) ->
                (PoolOpen (idleCount - 1) (Map.delete key m), Just a)
            Just (Cons a _ _ rest) ->
                (PoolOpen (idleCount - 1) (Map.insert key rest m), Just a)

-- | Try to return a resource to the pool. If too many resources
-- already exist, then just destroy it.
putResource :: Ord key => KeyedPool key resource -> key -> resource -> IO ()
putResource kp key resource = do
    now <- getCurrentTime
    join $ atomically $ do
        (m, action) <- fmap (go now) (readTVar (kpVar kp))
        writeTVar (kpVar kp) $! m
        return action
  where
    go _ PoolClosed = (PoolClosed, kpDestroy kp resource)
    go now pc@(PoolOpen idleCount m)
        | idleCount >= kpMaxTotal kp = (pc, kpDestroy kp resource)
        | otherwise = case Map.lookup key m of
            Nothing ->
                let cnt' = idleCount + 1
                    m' = PoolOpen cnt' (Map.insert key (One resource now) m)
                 in (m', return ())
            Just l ->
                let (l', mx) = addToList now (kpMaxPerKey kp) resource l
                    cnt' = idleCount + maybe 1 (const 0) mx
                    m' = PoolOpen cnt' (Map.insert key l' m)
                 in (m', maybe (return ()) (kpDestroy kp) mx)

-- | Add a new element to the list, up to the given maximum number. If we're
-- already at the maximum, return the new value as leftover.
addToList :: UTCTime -> Int -> a -> PoolList a -> (PoolList a, Maybe a)
addToList _ i x l | i <= 1 = (l, Just x)
addToList now _ x l@One{} = (Cons x 2 now l, Nothing)
addToList now maxCount x l@(Cons _ currCount _ _)
    | maxCount > currCount = (Cons x (currCount + 1) now l, Nothing)
    | otherwise = (l, Just x)

-- | A managed resource, which can be returned to the 'KeyedPool' when
-- work with it is complete. Using garbage collection, it will default
-- to destroying the resource if the caller does not explicitly use
-- 'managedRelease'.
data Managed resource = Managed
    { _managedResource :: !resource
    , _managedReused :: !Bool
    , _managedRelease :: !(Reuse -> IO ())
    , _managedAlive :: !(IORef ())
    }

-- | Get the raw resource from the 'Managed' value.
managedResource :: Managed resource -> resource
managedResource = _managedResource

-- | Was this value taken from the pool?
managedReused :: Managed resource -> Bool
managedReused = _managedReused

-- | Release the resource, after which it is invalid to use the
-- 'managedResource' value. 'Reuse' returns the resource to the
-- pool; 'DontReuse' destroys it.
managedRelease :: Managed resource -> Reuse -> IO ()
managedRelease = _managedRelease

data Reuse = Reuse | DontReuse

-- | For testing purposes only: create a dummy Managed wrapper
dummyManaged :: resource -> Managed resource
dummyManaged resource = Managed
    { _managedResource = resource
    , _managedReused = False
    , _managedRelease = const (return ())
    , _managedAlive = unsafePerformIO (newIORef ())
    }

ignoreExceptions :: IO () -> IO ()
ignoreExceptions f = f `catch` \(_ :: SomeException) -> return ()

-- | Prevent the managed resource from getting released before you want to use.
keepAlive :: Managed resource -> IO ()
keepAlive = readIORef . _managedAlive