1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
|
{- Worker thread pool.
-
- Copyright 2019 Joey Hess <id@joeyh.name>
-
- Licensed under the GNU AGPL version 3 or higher.
-}
module Types.WorkerPool where
import Control.Concurrent
import Control.Concurrent.Async
import qualified Data.Set as S
-- | Pool of worker threads.
data WorkerPool t = WorkerPool
{ usedStages :: UsedStages
, workerList :: [Worker t]
, spareVals :: [t]
-- ^ Normally there is one value for each IdleWorker,
-- but there can temporarily be fewer values, when a thread is
-- changing between stages.
}
instance Show (WorkerPool t) where
show p = unwords
[ "WorkerPool"
, show (usedStages p)
, show (workerList p)
, show (length (spareVals p))
]
-- | A worker can either be idle or running an Async action.
-- And it is used for some stage.
data Worker t
= IdleWorker WorkerStage
| ActiveWorker (Async t) WorkerStage
instance Show (Worker t) where
show (IdleWorker s) = "IdleWorker " ++ show s
show (ActiveWorker _ s) = "ActiveWorker " ++ show s
data WorkerStage
= StartStage
-- ^ All threads start in this stage, and then transition away from
-- it to the initialStage when they begin doing work. This should
-- never be included in UsedStages, because transition from some
-- other stage back to this one could result in a deadlock.
| PerformStage
-- ^ Running a CommandPerform action.
| CleanupStage
-- ^ Running a CommandCleanup action.
| TransferStage
-- ^ Transferring content to or from a remote.
| VerifyStage
-- ^ Verifying content, eg by calculating a checksum.
deriving (Show, Eq, Ord)
-- | Set of stages that make sense to be used while performing an action,
-- and the stage to use initially.
--
-- Transitions between these stages will block as needed until there's a
-- free Worker in the pool for the new stage.
--
-- Actions that indicate they are in some other stage won't change the
-- stage, and so there will be no blocking before starting them.
data UsedStages = UsedStages
{ initialStage :: WorkerStage
, stageSet :: S.Set WorkerStage
}
deriving (Show)
memberStage :: WorkerStage -> UsedStages -> Bool
memberStage s u = S.member s (stageSet u)
-- | The default is to use only the CommandPerform and CommandCleanup
-- stages. Since cleanup actions often don't contend much with
-- perform actions, this prevents blocking starting the next perform action
-- on finishing the previous cleanup action.
commandStages :: UsedStages
commandStages = UsedStages
{ initialStage = PerformStage
, stageSet = S.fromList [PerformStage, CleanupStage]
}
-- | When a command is downloading content, it can use this instead.
-- Downloads are often bottlenecked on the network or another disk
-- than the one containing the repository, while verification bottlenecks
-- on the disk containing the repository or on the CPU. So, run the
-- transfer and verify stage separately.
downloadStages :: UsedStages
downloadStages = UsedStages
{ initialStage = TransferStage
, stageSet = S.fromList [TransferStage, VerifyStage]
}
workerStage :: Worker t -> WorkerStage
workerStage (IdleWorker s) = s
workerStage (ActiveWorker _ s) = s
workerAsync :: Worker t -> Maybe (Async t)
workerAsync (IdleWorker _) = Nothing
workerAsync (ActiveWorker aid _) = Just aid
-- | Allocates a WorkerPool that has the specified number of workers
-- in it, of each stage.
--
-- The stages are distributed evenly throughout.
allocateWorkerPool :: t -> Int -> UsedStages -> WorkerPool t
allocateWorkerPool t n u = WorkerPool
{ usedStages = u
, workerList = map IdleWorker $
take totalthreads $ concat $ repeat stages
, spareVals = replicate totalthreads t
}
where
stages = StartStage : S.toList (stageSet u)
totalthreads = n * length stages
addWorkerPool :: Worker t -> WorkerPool t -> WorkerPool t
addWorkerPool w pool = pool { workerList = w : workerList pool }
-- | Removes a worker from the pool whose Async uses the ThreadId.
--
-- Each Async has its own ThreadId, so this stops once it finds
-- a match.
removeThreadIdWorkerPool :: ThreadId -> WorkerPool t -> Maybe ((Async t, WorkerStage), WorkerPool t)
removeThreadIdWorkerPool tid pool = go [] (workerList pool)
where
go _ [] = Nothing
go c (ActiveWorker a stage : rest)
| asyncThreadId a == tid =
let pool' = pool { workerList = (c++rest) }
in Just ((a, stage), pool')
go c (v : rest) = go (v:c) rest
deactivateWorker :: WorkerPool t -> Async t -> t -> WorkerPool t
deactivateWorker pool aid t = pool
{ workerList = go (workerList pool)
, spareVals = t : spareVals pool
}
where
go [] = []
go (w@(IdleWorker _) : rest) = w : go rest
go (w@(ActiveWorker a st) : rest)
| a == aid = IdleWorker st : rest
| otherwise = w : go rest
allIdle :: WorkerPool t -> Bool
allIdle pool = all idle (workerList pool)
-- If this does not hold, a thread must be transitioning between
-- states, so it's not really idle.
&& length (spareVals pool) == length (workerList pool)
where
idle (IdleWorker _) = True
idle (ActiveWorker _ _) = False
|