File: ThreadPool.hs

package info (click to toggle)
haskell-test-framework 0.6-1
  • links: PTS, VCS
  • area: main
  • in suites: wheezy
  • size: 176 kB
  • sloc: haskell: 928; makefile: 2
file content (104 lines) | stat: -rw-r--r-- 6,012 bytes parent folder | download | duplicates (7)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
module Test.Framework.Runners.ThreadPool (
        executeOnPool
    ) where

import Control.Concurrent
import Control.Monad

import qualified Data.IntMap as IM

import Foreign.StablePtr


data WorkerEvent token a = WorkerTermination
                         | WorkerItem token a

-- | Execute IO actions on several threads and return their results in the original
-- order.  It is guaranteed that no action from the input list is executed unless all
-- the items that precede it in the list have been executed or are executing at that
-- moment.
executeOnPool :: Int    -- ^ Number of threads to use
              -> [IO a] -- ^ Actions to execute: these will be scheduled left to right
              -> IO [a] -- ^ Ordered results of executing the given IO actions in parallel
executeOnPool n actions = do
    -- Prepare the channels
    input_chan <- newChan
    output_chan <- newChan
    
    -- Write the actions as items to the channel followed by one termination per thread
    -- that indicates they should terminate. We do this on another thread for
    -- maximum laziness (in case one the actions we are going to run depend on the
    -- output from previous actions..)
    _ <- forkIO $ writeList2Chan input_chan (zipWith WorkerItem [0..] actions ++ replicate n WorkerTermination)
    
    -- Spawn workers
    forM_ [1..n] (const $ forkIO $ poolWorker input_chan output_chan)

    -- Short version: make sure we do the right thing if a test blocks on dead
    -- MVars or TVars.
    -- Long version: GHC is clever enough to throw an exception (BlockedOnDeadMVar
    -- or BlockedIndefinitely) when a thread is waiting for a MVar or TVar that can't
    -- be written to. However, it doesn't know anything about the handlers for those
    -- exceptions. Therefore, when a worker runs a test that causes this exception,
    -- since the main thread is blocking on the worker, the main thread gets the
    -- exception too despite the fact that the main thread will be runnable as soon
    -- as the worker catches its own exception. The below makes sure the main thread
    -- is always reachable by the GC, which is the mechanism for finding threads
    -- that are unrunnable.
    -- See also the ticket where SimonM (semi-cryptically) explains this:
    -- http://hackage.haskell.org/trac/ghc/ticket/3291
    --
    -- NB: this actually leaks stable pointers. We could prevent this by making
    -- takeWhileWorkersExist do |unsafePerformIO newStablePtr| when returning the
    -- lazily-demanded tail of the list, but its a bit of a pain. For now, just
    -- grit our teeth and accept the leak.
    _stablePtr <- myThreadId >>= newStablePtr
    
    -- Return the results generated by the worker threads lazily and in
    -- the same order as we got the inputs
    fmap (reorderFrom 0 . takeWhileWorkersExist n) $ getChanContents output_chan

poolWorker :: Chan (WorkerEvent token (IO a)) -> Chan (WorkerEvent token a) -> IO ()
poolWorker input_chan output_chan = do
    -- Read an action and work out whether we should continue or stop
    action_item <- readChan input_chan
    case action_item of
        WorkerTermination -> writeChan output_chan WorkerTermination -- Must have run out of real actions to execute
        WorkerItem token action -> do
            -- Do the action then loop
            result <- action
            writeChan output_chan (WorkerItem token result)
            poolWorker input_chan output_chan

-- | Keep grabbing items out of the infinite list of worker outputs until we have
-- recieved word that all of the workers have shut down.  This lets us turn a possibly
-- infinite list of outputs into a certainly finite one suitable for use with reorderFrom.
takeWhileWorkersExist :: Int -> [WorkerEvent token a] -> [(token, a)]
takeWhileWorkersExist worker_count events
  | worker_count <= 0 = []
  | otherwise         = case events of
                            (WorkerTermination:events')  -> takeWhileWorkersExist (worker_count - 1) events'
                            (WorkerItem token x:events') -> (token, x) : takeWhileWorkersExist worker_count events'
                            []                           -> []

-- | This function carefully shuffles the input list so it in the total order
-- defined by the integers paired with the elements.  If the list is @xs@ and
-- the supplied initial integer is @n@, it must be the case that:
--
-- > sort (map fst xs) == [n..n + (length xs - 1)]
--
-- This function returns items in the lazy result list as soon as it is sure
-- it has the right item for that position.
reorderFrom :: Int -> [(Int, a)] -> [a]
reorderFrom from initial_things = go from initial_things IM.empty False
  where go next [] buf _
          | IM.null buf = []    -- If the buffer and input list is empty, we're done
          | otherwise   = go next (IM.toList buf) IM.empty False    -- Make sure we check the buffer even if the list is done
        go next all_things@((token, x):things) buf buf_useful
          | token == next                       -- If the list token matches the one we were expecting we can just take the item
          = x : go (next + 1) things buf True   -- Always worth checking the buffer now because the expected item has changed
          | buf_useful                                                              -- If it's worth checking the buffer, it's possible the token we need is in it
          , (Just x', buf') <- IM.updateLookupWithKey (const $ const Nothing) next buf  -- Delete the found item from the map (if we find it) to save space
          = x' : go (next + 1) all_things buf' True                                     -- Always worth checking the buffer now because the expected item has changed
          | otherwise                                       -- Token didn't match, buffer unhelpful: it must be in the tail of the list
          = go next things (IM.insert token x buf) False    -- Since we've already checked the buffer, stop bothering to do so until something changes -}