1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
{-# LANGUAGE TemplateHaskell, DeriveDataTypeable #-}
{-# OPTIONS_GHC -O2 -ddump-splices #-}
-- Simple tests of distributed work stealing.
----------------------------------------------
import System.Environment (getArgs)
import qualified Control.Monad.Par.Meta.Dist as D
import Control.Monad.IO.Class (liftIO)
import Control.Monad (mapM_)
-- Tweaked version of CloudHaskell's closures:
import Remote2.Call (mkClosureRec, remotable)
import Control.Concurrent (myThreadId, threadDelay)
import System.Process (readProcess)
import System.Posix.Process (getProcessID)
import Data.Char (isSpace)
import Data.Typeable
import Data.Binary
import qualified Data.ByteString.Char8 as BS
-- import qualified Data.IntMap as M
-- import qualified Data.Set as M
--------------------------------------------------------------------------------
-- A description of a fake work topology which is executed by sleeping
-- (threadDelay) different amounts at different points.
-- Presently this describes futures-only work:
data FakeWork = Work Time FakeWork
| Fork FakeWork FakeWork
| SyncAll FakeWork
| Nop
-- | Spawn Id FakeWork FakeWork
-- | Seq [FakeWork]
-- | Sync Id FakeWork
deriving (Eq,Show,Read,Typeable)
-- All constructors take a continuation.
type Time = Int -- Time in milliseconds
type Id = Int
-- A simple test that ensures a single steal for two workers:
-- t1 = Spawn 11 (Work 101) $
-- Seq [Work 102,
-- Sync 11]
t1 = Fork (Work 101 Nop) $
Work 102 $
SyncAll Nop
t2 = switcher 3 3 True
-- This one alternates where the *real* work will be.
--
-- Takes number of switches. Performs a shrinking number of work
-- items per "straightaway":
switcher 1 m _ = Work 100 (SyncAll Nop)
switcher _ _ False = Work 101 (SyncAll Nop)
switcher n m True = Work 102 $ switcher n (m-1) True
switcher n 1 b = Fork (switcher (n-1) n b)
(switcher (n-1) n (not b))
----------------------------------------------------------------------------------------------------
runTest :: FakeWork -> D.Par ()
--runTest fw = theloop fw M.empty
runTest fw = theloop fw []
theloop fw ls =
case fw of
Nop -> return ()
Work t cont ->
do liftIO$ threadDelay (t * 1000)
liftIO$ putStrLn$ "Finished work "++show t
theloop cont ls
-- Seq ls -> mapM_ theloop ls
-- Spawn id fw cont ->
Fork child cont ->
do
iv <- D.longSpawn$ $(mkClosureRec 'runTest) child
-- Associate id with ivar
-- theloop cont (M.insert id iv mp)
theloop cont (iv:ls)
SyncAll cont ->
do mapM_ D.get ls
theloop cont []
-- Generate stub code for RPC:
remotable ['runTest]
-- instance Serialize FakeWork where
instance Binary FakeWork where
put fw = put (show fw)
-- get str = return (read (BS.unpack str))
get = get >>= return . read
----------------------------------------------------------------------------------------------------
main = do
[version] <- getArgs
case version of
"slave" -> D.runParSlaveWithTransport [__remoteCallMetaData] D.TCP
"master" -> do
D.runParDistWithTransport [__remoteCallMetaData] D.TCP (runTest t1)
putStrLn $ "Finished with work. Calling SHUTDOWN..."
D.shutdownDist
putStrLn $ "... returned from shutdown, apparently successful."
str -> error$"Unhandled mode: " ++ str
|