1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
|
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RecordWildCards #-}
import Criterion.Main
import Control.Concurrent (setNumCapabilities)
import Control.Concurrent.Async
import BroadcastChan
import qualified BroadcastChan.Throw as Throw
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.DeepSeq (NFData(..))
import Control.Monad (forM, guard, replicateM, void)
import qualified Control.Monad as Monad
import Data.Bifunctor (second)
import Data.Int (Int64)
import GHC.Conc (getNumProcessors)
import GHC.Generics (Generic)
instance NFData (BroadcastChan io a) where
rnf !_ = ()
instance NFData (IO a) where
rnf !_ = ()
replicateM_ :: Monad m => Int64 -> m a -> m ()
replicateM_ = Monad.replicateM_ . fromIntegral
splitEqual :: Integral a => a -> a -> [a]
splitEqual _ 0 = []
splitEqual total n =
replicate rest (base + 1) ++ replicate (fromIntegral n - rest) base
where
(base, rest) = second fromIntegral $ total `quotRem` n
data Config
= Config
{ writers :: Int
, readers :: Int
, numMsgs :: Int64
, broadcast :: Bool
}
data ChanOps
= ChanOps
{ putChan :: !(IO ())
, takeChan :: !(IO ())
, dupTake :: !(IO (IO ()))
} deriving (Generic)
instance NFData ChanOps
data ChanType
= Chan
{ chanName :: String
, canBroadcast :: Bool
, allocChan :: Int64 -> Int64 -> IO ChanOps
}
benchBChan :: ChanType
benchBChan = Chan "BroadcastChan" True $ \_size numMsgs -> do
chan <- newBroadcastChan
listener <- newBChanListener chan
replicateM_ numMsgs $ writeBChan chan ()
return ChanOps
{ putChan = void $ writeBChan chan ()
, takeChan = void $ readBChan listener
, dupTake = void . readBChan <$> newBChanListener chan
}
{-# INLINE benchBChan #-}
benchBChanExcept :: ChanType
benchBChanExcept = Chan "BroadcastChan.Throw" True $ \_size numMsgs -> do
chan <- newBroadcastChan
listener <- newBChanListener chan
replicateM_ numMsgs $ writeBChan chan ()
return ChanOps
{ putChan = Throw.writeBChan chan ()
, takeChan = Throw.readBChan listener
, dupTake = Throw.readBChan <$> newBChanListener chan
}
{-# INLINE benchBChanExcept #-}
benchBChanDrop :: ChanType
benchBChanDrop = Chan "BroadcastChan (drop)" False $ \_ _ -> do
chan <- newBroadcastChan
return ChanOps
{ putChan = void $ writeBChan chan ()
, takeChan = fail "Dropping BroadcastChan doesn't support reading."
, dupTake = fail "Dropping BroadcastChan doesn't support broadcasting."
}
{-# INLINE benchBChanDrop #-}
benchBChanDropExcept :: ChanType
benchBChanDropExcept = Chan "BroadcastChan.Throw (drop)" False $ \_ _ -> do
chan <- newBroadcastChan
return ChanOps
{ putChan = Throw.writeBChan chan ()
, takeChan = fail "Dropping BroadcastChan doesn't support reading."
, dupTake = fail "Dropping BroadcastChan doesn't support broadcasting."
}
{-# INLINE benchBChanDropExcept #-}
benchChan :: ChanType
benchChan = Chan "Chan" True $ \_size numMsgs -> do
chan <- newChan
replicateM_ numMsgs $ writeChan chan ()
return ChanOps
{ putChan = writeChan chan ()
, takeChan = readChan chan
, dupTake = readChan <$> dupChan chan
}
{-# INLINE benchChan #-}
benchTChan :: ChanType
benchTChan = Chan "TChan" True $ \_size numMsgs -> do
chan <- newTChanIO
replicateM_ numMsgs . atomically $ writeTChan chan ()
return ChanOps
{ putChan = atomically $ writeTChan chan ()
, takeChan = atomically $ readTChan chan
, dupTake = atomically . readTChan <$> atomically (dupTChan chan)
}
{-# INLINE benchTChan #-}
benchTQueue :: ChanType
benchTQueue = Chan "TQueue" False $ \_size numMsgs -> do
chan <- newTQueueIO
replicateM_ numMsgs . atomically $ writeTQueue chan ()
return ChanOps
{ putChan = atomically $ writeTQueue chan ()
, takeChan = atomically $ readTQueue chan
, dupTake = return (fail "TQueue doesn't support broadcasting")
}
{-# INLINE benchTQueue #-}
benchTBQueue :: ChanType
benchTBQueue = Chan "TBQueue" False $ \size numMsgs -> do
chan <- newTBQueueIO (fromIntegral size)
replicateM_ numMsgs . atomically $ writeTBQueue chan ()
return ChanOps
{ putChan = atomically $ writeTBQueue chan ()
, takeChan = atomically $ readTBQueue chan
, dupTake = return (fail "TBQueue doesn't support broadcasting")
}
{-# INLINE benchTBQueue #-}
benchWrites :: ChanType -> Benchmark
benchWrites Chan{..} =
bench chanName $ perBatchEnv (\i -> allocChan i 0) putChan
benchReads :: ChanType -> Benchmark
benchReads Chan{..} =
bench chanName $ perBatchEnv (\i -> allocChan i i) takeChan
benchConcurrent :: Config -> ChanType -> Benchmark
benchConcurrent Config{..} Chan{..} =
if broadcast && not canBroadcast
then bgroup "" []
else bench chanName $ perRunEnv setupConcurrent id
where
splitMsgs :: Integral a => a -> [Int64]
splitMsgs = splitEqual numMsgs . fromIntegral
preloadedMsgs :: Int64
preloadedMsgs
| writers == 0 = numMsgs
| otherwise = 0
launchReaders :: ChanOps -> IO [Async ()]
launchReaders ChanOps{..}
| broadcast = replicateM readers $ do
doTake <- dupTake
async $ replicateM_ numMsgs doTake
| otherwise = forM (splitMsgs readers) $ async . \n -> do
replicateM_ n takeChan
setupConcurrent :: IO (IO ())
setupConcurrent = do
start <- newEmptyMVar
chan@ChanOps{..} <- allocChan numMsgs preloadedMsgs
wThreads <- forM (splitMsgs writers) $ async . \n -> do
readMVar start
replicateM_ n putChan
rThreads <- launchReaders chan
return $ putMVar start () >> mapM_ wait (wThreads ++ rThreads)
{-# INLINE benchConcurrent #-}
runConcurrent
:: String -> [Int] -> [Int] -> [Int64] -> Bool -> [ChanType] -> Benchmark
runConcurrent typeName writerCounts readerCounts msgs broadcast chans =
bgroup typeName $ map makeBenchGroup threads
where
threads = do
ws <- writerCounts
rs <- readerCounts
guard $ (ws, rs) `notElem` [(0,0),(0,1),(1,0)]
return (ws, rs)
makeBenchGroup :: (Int, Int) -> Benchmark
makeBenchGroup (writers, readers) = bgroup groupName $ map mkBench msgs
where
groupName :: String
groupName
| writers == 0 = show readers ++ " readers"
| readers == 0 = show writers ++ " writers"
| otherwise = show writers ++ " to " ++ show readers
mkBench :: Int64 -> Benchmark
mkBench numMsgs =
bgroup name $ map (benchConcurrent Config{..}) chans
where
name = show numMsgs ++ " messages"
chanTypes :: [ChanType]
chanTypes =
[ benchBChan
, benchBChanExcept
, benchChan
, benchTChan
, benchTQueue
, benchTBQueue
]
writeChanTypes :: [ChanType]
writeChanTypes = [ benchBChanDrop, benchBChanDropExcept ] ++ chanTypes
main :: IO ()
main = do
getNumProcessors >>= setNumCapabilities
defaultMain
[ bgroup "Write" $ map benchWrites writeChanTypes
, bgroup "Read" $ map benchReads chanTypes
, bgroup "Concurrent"
[ runConcurrentWrites False writeChanTypes
, runConcurrentReads False chanTypes
, runConcurrentBench False chanTypes
]
, bgroup "Broadcast"
[ runConcurrentWrites True chanTypes
, runConcurrentReads True chanTypes
, runConcurrentBench True chanTypes
]
]
where
threads = [1,2,5,10,100,1000,10^4]
msgCounts = [10^4,10^5,10^6]
runConcurrentBench = runConcurrent "Read-Write" threads threads msgCounts
runConcurrentWrites = runConcurrent "Write" threads [0] msgCounts
runConcurrentReads = runConcurrent "Read" [0] threads msgCounts
|