File: DirectInternal.hs

package info (click to toggle)
haskell-monad-par 0.3.6-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 208 kB
  • sloc: haskell: 1,583; makefile: 19
file content (204 lines) | stat: -rw-r--r-- 6,841 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
{-# LANGUAGE PackageImports, CPP, GeneralizedNewtypeDeriving,
             DeriveDataTypeable #-}

-- | Type definition and some helpers.  This is used mainly by
-- Direct.hs but can also be used by other modules that want access to
-- the internals of the scheduler (i.e. the private `Par` type constructor).

module Control.Monad.Par.Scheds.DirectInternal where

#if !MIN_VERSION_base(4,6,0)
import Prelude hiding (catch)
#endif

import Control.Applicative
import "mtl" Control.Monad.Cont as C
import qualified "mtl" Control.Monad.Reader as RD
import "mtl" Control.Monad.Trans (liftIO)

import qualified System.Random.MWC as Random

import Control.Concurrent hiding (yield)
import GHC.Conc
import Data.IORef
import qualified Data.Set as S
import Data.Word (Word64)
import Data.Concurrent.Deque.Class (WSDeque)
import Control.Monad.Fix (MonadFix (mfix))
#if MIN_VERSION_base(4,9,0)
import GHC.IO.Unsafe (unsafeDupableInterleaveIO)
#else
import System.IO.Unsafe (unsafeInterleaveIO)
#endif

#ifdef USE_CHASELEV
#warning "Note: using Chase-Lev lockfree workstealing deques..."
import Data.Concurrent.Deque.ChaseLev.DequeInstance
import Data.Concurrent.Deque.ChaseLev as R
#endif

import Data.Typeable (Typeable)
import Control.Exception (Exception, throwIO, BlockedIndefinitelyOnMVar (..),
                          catch)

-- Our monad stack looks like this:
--      ---------
--        ContT
--       ReaderT
--         IO
--      ---------
-- The ReaderT monad is there for retrieving the scheduler given the
-- fact that the API calls do not get it as an argument.
--
-- Note that the result type for continuations is unit.  Forked
-- computations return nothing.
--
newtype Par a = Par { unPar :: C.ContT () ROnly a }
    deriving (Functor, Applicative, Monad, MonadCont, RD.MonadReader Sched)
type ROnly = RD.ReaderT Sched IO

instance MonadFix Par where
  mfix = fixPar

-- | Take the monadic fixpoint of a 'Par' computation. This is
-- the definition of 'mfix' for 'Par'. Throws 'FixParException'
-- if the result is demanded strictly within the computation.
fixPar :: (a -> Par a) -> Par a
-- We do this IO-style, rather than ST-style, in order to get a
-- consistent exception type. Using the ST-style mfix, a strict
-- argument could lead us to *either* a <<loop>> exception *or*
-- (if the wrong sort of computation gets re-run) a "multiple-put"
-- error.
fixPar f = Par $ ContT $ \ar -> RD.ReaderT $ \sched -> do
  mv <- newEmptyMVar
  ans <- unsafeDupableInterleaveIO (readMVar mv `catch`
      \ ~BlockedIndefinitelyOnMVar -> throwIO FixParException)
  flip RD.runReaderT sched $
    runContT (unPar (f ans)) $ \a -> liftIO (putMVar mv a) >> ar a

#if !MIN_VERSION_base(4,9,0)
unsafeDupableInterleaveIO :: IO a -> IO a
unsafeDupableInterleaveIO = unsafeInterleaveIO
#endif

data FixParException = FixParException deriving (Show, Typeable)
instance Exception FixParException

type SessionID = Word64

-- An ID along with a flag to signal completion:
data Session = Session SessionID (HotVar Bool)

data Sched = Sched
    {
      ---- Per worker ----
      no       :: {-# UNPACK #-} !Int,
      workpool :: WSDeque (Par ()),
      rng      :: HotVar Random.GenIO, -- Random number gen for work stealing.
      isMain :: Bool, -- Are we the main/master thread?

      -- The stack of nested sessions that THIS worker is participating in.
      -- When a session finishes, the worker can return to its Haskell
      -- calling context (it's "real" continuation).
      sessions :: HotVar [Session],
      -- (1) This is always non-empty, containing at least the root
      --     session corresponding to the anonymous system workers.
      -- (2) The original invocation of runPar also counts as a session
      --     and pushes a second
      -- (3) Nested runPar invocations may push further sessions onto the stack.

      ---- Global data: ----
      idle     :: HotVar [MVar Bool], -- waiting idle workers
      scheds   :: [Sched],            -- A global list of schedulers.

      -- Any thread that enters runPar (original or nested) registers
      -- itself in this global list.  When the list becomes null,
      -- worker threads may shut down or at least go idle.
      activeSessions :: HotVar (S.Set SessionID),

      -- A counter to support unique session IDs:
      sessionCounter :: HotVar SessionID
     }


--------------------------------------------------------------------------------
-- Helpers #1:  Atomic Variables
--------------------------------------------------------------------------------
-- TEMP: Experimental

#ifndef HOTVAR
#define HOTVAR 1
#endif
newHotVar      :: a -> IO (HotVar a)
modifyHotVar   :: HotVar a -> (a -> (a,b)) -> IO b
modifyHotVar_  :: HotVar a -> (a -> a) -> IO ()
writeHotVar    :: HotVar a -> a -> IO ()
readHotVar     :: HotVar a -> IO a
-- readHotVarRaw  :: HotVar a -> m a
-- writeHotVarRaw :: HotVar a -> m a

{-# INLINE newHotVar     #-}
{-# INLINE modifyHotVar  #-}
{-# INLINE modifyHotVar_ #-}
{-# INLINE readHotVar    #-}
{-# INLINE writeHotVar   #-}


#if HOTVAR == 1
type HotVar a = IORef a
newHotVar     = newIORef
modifyHotVar  = atomicModifyIORef
modifyHotVar_ v fn = atomicModifyIORef v (\a -> (fn a, ()))
readHotVar    = readIORef
writeHotVar   = writeIORef
instance Show (IORef a) where
  show _ref = "<ioref>"

writeHotVarRaw :: HotVar a -> a -> IO ()
-- hotVarTransaction = id
hotVarTransaction = error "Transactions not currently possible for IO refs"
readHotVarRaw :: HotVar a -> IO a
readHotVarRaw  = readHotVar
writeHotVarRaw = writeHotVar


#elif HOTVAR == 2
#warning "Using MVars for hot atomic variables."
-- This uses MVars that are always full with *something*
type HotVar a = MVar a
newHotVar   x = do v <- newMVar; putMVar v x; return v
modifyHotVar  v fn = modifyMVar  v (return . fn)
modifyHotVar_ v fn = modifyMVar_ v (return . fn)
readHotVar    = readMVar
writeHotVar v x = do swapMVar v x; return ()
instance Show (MVar a) where
  show _ref = "<mvar>"

-- hotVarTransaction = id
-- We could in theory do this by taking the mvar to grab the lock.
-- But we'd need some temporary storage....
hotVarTransaction = error "Transactions not currently possible for MVars"
readHotVarRaw  = readHotVar
writeHotVarRaw = writeHotVar


#elif HOTVAR == 3
#warning "Using TVars for hot atomic variables."
-- Simon Marlow said he saw better scaling with TVars (surprise to me):
type HotVar a = TVar a
newHotVar = newTVarIO
modifyHotVar  tv fn = atomically (do x <- readTVar tv
				     let (x2,b) = fn x
				     writeTVar tv x2
				     return b)
modifyHotVar_ tv fn = atomically (do x <- readTVar tv; writeTVar tv (fn x))
readHotVar x = atomically $ readTVar x
writeHotVar v x = atomically $ writeTVar v x
instance Show (TVar a) where
  show ref = "<tvar>"

hotVarTransaction = atomically
readHotVarRaw  = readTVar
writeHotVarRaw = writeTVar

#endif