File: JobControl.hs

package info (click to toggle)
haskell-cabal-install 3.10.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,400 kB
  • sloc: haskell: 52,202; sh: 80; makefile: 9
file content (193 lines) | stat: -rw-r--r-- 6,953 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
{-# LANGUAGE DeriveDataTypeable #-}
module UnitTests.Distribution.Client.JobControl (tests) where

import Distribution.Client.JobControl

import Distribution.Client.Compat.Prelude
import Prelude ()

import Data.IORef (newIORef, atomicModifyIORef)
import Control.Monad (replicateM_, replicateM)
import Control.Concurrent (threadDelay)
import Control.Exception  (try)
import qualified Data.Set as Set

import Test.Tasty
import Test.Tasty.QuickCheck hiding (collect)


tests :: [TestTree]
tests =
  [ testGroup "serial"
      [ testProperty "submit batch"       prop_submit_serial
      , testProperty "submit batch"       prop_remaining_serial
      , testProperty "submit interleaved" prop_interleaved_serial
      , testProperty "concurrent jobs"    prop_concurrent_serial
      , testProperty "cancel"             prop_cancel_serial
      , testProperty "exceptions"         prop_exception_serial
      ]
  , testGroup "parallel"
      [ testProperty "submit batch"       prop_submit_parallel
      , testProperty "submit batch"       prop_remaining_parallel
      , testProperty "submit interleaved" prop_interleaved_parallel
      , testProperty "concurrent jobs"    prop_concurrent_parallel
      , testProperty "cancel"             prop_cancel_parallel
      , testProperty "exceptions"         prop_exception_parallel
      ]
  ]


prop_submit_serial :: [Int] -> Property
prop_submit_serial xs =
  ioProperty $ do
    jobCtl <- newSerialJobControl
    prop_submit jobCtl xs

prop_submit_parallel :: Positive (Small Int) -> [Int] -> Property
prop_submit_parallel (Positive (Small maxJobLimit)) xs =
  ioProperty $ do
    jobCtl <- newParallelJobControl maxJobLimit
    prop_submit jobCtl xs

prop_remaining_serial :: [Int] -> Property
prop_remaining_serial xs =
  ioProperty $ do
    jobCtl <- newSerialJobControl
    prop_remaining jobCtl xs

prop_remaining_parallel :: Positive (Small Int) -> [Int] -> Property
prop_remaining_parallel (Positive (Small maxJobLimit)) xs =
  ioProperty $ do
    jobCtl <- newParallelJobControl maxJobLimit
    prop_remaining jobCtl xs

prop_interleaved_serial :: [Int] -> Property
prop_interleaved_serial xs =
  ioProperty $ do
    jobCtl <- newSerialJobControl
    prop_submit_interleaved jobCtl xs

prop_interleaved_parallel :: Positive (Small Int) -> [Int] -> Property
prop_interleaved_parallel (Positive (Small maxJobLimit)) xs =
  ioProperty $ do
    jobCtl <- newParallelJobControl maxJobLimit
    prop_submit_interleaved jobCtl xs

prop_submit :: JobControl IO Int -> [Int] -> IO Bool
prop_submit jobCtl xs = do
    traverse_ (\x -> spawnJob jobCtl (return x)) xs
    xs' <- traverse (\_ -> collectJob jobCtl) xs
    return (sort xs == sort xs')

prop_remaining :: JobControl IO Int -> [Int] -> IO Bool
prop_remaining jobCtl xs = do
    traverse_ (\x -> spawnJob jobCtl (return x)) xs
    xs' <- collectRemainingJobs jobCtl
    return (sort xs == sort xs')

collectRemainingJobs :: Monad m => JobControl m a -> m [a]
collectRemainingJobs jobCtl = go []
  where
    go xs = do
      remaining <- remainingJobs jobCtl
      if remaining
        then do x <- collectJob jobCtl
                go (x:xs)
        else return xs

prop_submit_interleaved :: JobControl IO (Maybe Int) -> [Int] -> IO Bool
prop_submit_interleaved jobCtl xs = do
    xs' <- sequenceA
      [ spawn >> collect
      | let spawns   = map (\x -> spawnJob jobCtl (return (Just x))) xs
                    ++ repeat (return ())
            collects = replicate 5 (return Nothing)
                    ++ map (\_ -> collectJob jobCtl) xs
      , (spawn, collect) <- zip spawns collects
      ]
    return (sort xs == sort (catMaybes xs'))

prop_concurrent_serial :: NonNegative (Small Int) -> Property
prop_concurrent_serial (NonNegative (Small ntasks)) =
  ioProperty $ do
    jobCtl   <- newSerialJobControl
    countRef <- newIORef (0 :: Int)
    replicateM_ ntasks (spawnJob jobCtl (task countRef))
    counts   <- replicateM ntasks (collectJob jobCtl)
    return $ length counts == ntasks
          && all (\(n0, n1) -> n0 == 0 && n1 == 1) counts
  where
    task countRef = do
      n0 <- atomicModifyIORef countRef (\n -> (n+1, n))
      threadDelay 100
      n1 <- atomicModifyIORef countRef (\n -> (n-1, n))
      return (n0, n1)

prop_concurrent_parallel :: Positive (Small Int) -> NonNegative Int -> Property
prop_concurrent_parallel (Positive (Small maxJobLimit)) (NonNegative ntasks) =
  ioProperty $ do
    jobCtl   <- newParallelJobControl maxJobLimit
    countRef <- newIORef (0 :: Int)
    replicateM_ ntasks (spawnJob jobCtl (task countRef))
    counts   <- replicateM ntasks (collectJob jobCtl)
    return $ length counts == ntasks
          && all (\(n0, n1) -> n0 >= 0 && n0 <  maxJobLimit
                            && n1 >  0 && n1 <= maxJobLimit) counts
             -- we do hit the concurrency limit (in the right circumstances)
          && if ntasks >= maxJobLimit*2 -- give us enough of a margin
               then any (\(_,n1) -> n1 == maxJobLimit) counts
               else True
  where
    task countRef = do
      n0 <- atomicModifyIORef countRef (\n -> (n+1, n))
      threadDelay 100
      n1 <- atomicModifyIORef countRef (\n -> (n-1, n))
      return (n0, n1)

prop_cancel_serial :: [Int] -> [Int] -> Property
prop_cancel_serial xs ys =
  ioProperty $ do
    jobCtl <- newSerialJobControl
    traverse_ (\x -> spawnJob jobCtl (return x)) (xs++ys)
    xs' <- traverse (\_ -> collectJob jobCtl) xs
    cancelJobs jobCtl
    ys' <- collectRemainingJobs jobCtl
    return (sort xs == sort xs' && null ys')

prop_cancel_parallel :: Positive (Small Int) -> [Int] -> [Int] -> Property
prop_cancel_parallel (Positive (Small maxJobLimit)) xs ys = do
  ioProperty $ do
    jobCtl <- newParallelJobControl maxJobLimit
    traverse_ (\x -> spawnJob jobCtl (threadDelay 100  >> return x)) (xs++ys)
    xs' <- traverse (\_ -> collectJob jobCtl) xs
    cancelJobs jobCtl
    ys' <- collectRemainingJobs jobCtl
    return $ Set.fromList (xs'++ys') `Set.isSubsetOf` Set.fromList (xs++ys)

data TestException = TestException Int
  deriving (Typeable, Show)

instance Exception TestException

prop_exception_serial :: [Either Int Int] -> Property
prop_exception_serial xs =
  ioProperty $ do
    jobCtl <- newSerialJobControl
    prop_exception jobCtl xs

prop_exception_parallel :: Positive (Small Int) -> [Either Int Int] -> Property
prop_exception_parallel (Positive (Small maxJobLimit)) xs =
  ioProperty $ do
    jobCtl <- newParallelJobControl maxJobLimit
    prop_exception jobCtl xs

prop_exception :: JobControl IO Int -> [Either Int Int] -> IO Bool
prop_exception jobCtl xs = do
    traverse_ (\x -> spawnJob jobCtl (either (throwIO . TestException) return x)) xs
    xs' <- replicateM (length xs) $ do
             mx <- try (collectJob jobCtl)
             return $ case mx of
               Left (TestException n) -> Left  n
               Right               n  -> Right n
    return (sort xs == sort xs')