File: Parallel.hs

package info (click to toggle)
haskell-tasty 0.10-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 188 kB
  • sloc: haskell: 1,362; makefile: 2
file content (144 lines) | stat: -rw-r--r-- 5,066 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
-- | A helper module which takes care of parallelism
{-# LANGUAGE DeriveDataTypeable #-}
module Test.Tasty.Parallel (runInParallel) where

import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Foreign.StablePtr
import Data.Typeable
import GHC.Conc (labelThread)

data Interrupt = Interrupt
  deriving Typeable
instance Show Interrupt where
  show Interrupt = "interrupted"
instance Exception Interrupt

data ParThreadKilled = ParThreadKilled SomeException
  deriving Typeable
instance Show ParThreadKilled where
  show (ParThreadKilled exn) =
    "tasty: one of the test running threads was killed by: " ++
    show exn
instance Exception ParThreadKilled

shutdown :: ThreadId -> IO ()
shutdown = flip throwTo Interrupt

-- | Take a list of actions and execute them in parallel, no more than @n@
-- at the same time.
--
-- The action itself is asynchronous, ie. it returns immediately and does
-- the work in new threads. It returns an action which aborts tests and
-- cleans up.
runInParallel
  :: Int -- ^ maximum number of parallel threads
  -> [IO ()] -- ^ list of actions to execute
  -> IO (IO ())
-- This implementation tries its best to ensure that exceptions are
-- properly propagated to the caller and threads are not left running.
--
-- Note that exceptions inside tests are already caught by the test
-- actions themselves. Any exceptions that reach this function or its
-- threads are by definition unexpected.
runInParallel nthreads actions = do
  callingThread <- myThreadId

  -- Don't let the main thread be garbage-collected
  -- Otherwise we may get a "thread blocked indefinitely in an STM
  -- transaction" exception when a child thread is blocked and GC'd.
  -- (See e.g. https://github.com/feuerbach/tasty/issues/15)
  _ <- newStablePtr callingThread

  -- A variable containing all ThreadIds of forked threads.
  --
  -- These are the threads we'll need to kill if something wrong happens.
  pidsVar <- atomically $ newTVar []

  -- If an unexpected exception has been thrown and we started killing all
  -- the spawned threads, this flag will be set to False, so that any
  -- freshly spawned threads will know to terminate, even if their pids
  -- didn't make it to the "kill list" yet.
  aliveVar <- atomically $ newTVar True

  let
    -- Kill all threads.
    shutdownAll :: IO ()
    shutdownAll = do
      pids <- atomically $ do
        writeTVar aliveVar False
        readTVar pidsVar

      -- be sure not to kill myself!
      me <- myThreadId
      mapM_ shutdown $ filter (/= me) pids

    cleanup :: Either SomeException () -> IO ()
    cleanup Right {} = return ()
    cleanup (Left exn)
      | Just Interrupt <- fromException exn
        -- I'm being shut down either by a fellow thread (which caught an
        -- exception), or by the main thread which decided to stop running
        -- tests. In any case, just end silently.
        = return ()
      | otherwise = do
        -- Wow, I caught an exception (most probably an async one,
        -- although it doesn't really matter). Shut down all other
        -- threads, and re-throw my exception to the calling thread.
        shutdownAll
        throwTo callingThread $ ParThreadKilled exn

    forkCarefully :: IO () -> IO ThreadId
    forkCarefully action = flip myForkFinally cleanup $ do
      -- We cannot check liveness and update the pidsVar in one
      -- transaction before forking, because we don't know the new pid yet.
      --
      -- So we fork and then check/update. If something has happened in
      -- the meantime, it's not a big deal — we just cancel. OTOH, if
      -- we're alive at the time of the transaction, then we add our pid
      -- and will be killed when something happens.
      newPid <- myThreadId

      join . atomically $ do
        alive <- readTVar aliveVar
        if alive
          then do
            modifyTVar pidsVar (newPid :)
            return action
          else
            return (return ())

  capsVar <- atomically $ newTVar nthreads

  let
    go a cont = join . atomically $ do
      caps <- readTVar capsVar
      if caps > 0
        then do
          writeTVar capsVar $! caps - 1
          let
            release = atomically $ modifyTVar' capsVar (+1)

          -- Thanks to our exception handling, we won't deadlock even if
          -- an exception strikes before we 'release'. Everything will be
          -- killed, so why bother.
          return $ do
            pid <- forkCarefully (do a; release)
            labelThread pid "tasty_test_thread"
            cont

        else retry

  -- fork here as well, so that we can move to the UI without waiting
  -- untill all tests have finished
  pid <- forkCarefully $ foldr go (return ()) actions
  labelThread pid "tasty_thread_manager"
  return shutdownAll

-- Copied from base to stay compatible with GHC 7.4.
myForkFinally :: IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
myForkFinally action and_then =
  mask $ \restore ->
    forkIO $ try (restore action) >>= and_then