1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
{-# LANGUAGE RecordWildCards #-}
-- create a similar concept than a unix pipe.
module PipeChan (
PipeChan (..),
newPipe,
runPipe,
readPipeC,
readPipeS,
writePipeC,
writePipeS,
) where
import Control.Concurrent
import Control.Monad (forever)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.IORef
----------------------------------------------------------------
-- | represent a unidirectional pipe with a buffered read channel and
-- a write channel
data UniPipeChan = UniPipeChan
{ getReadUniPipe :: Chan ByteString
, getWriteUniPipe :: Chan ByteString
}
newUniPipeChan :: IO UniPipeChan
newUniPipeChan = UniPipeChan <$> newChan <*> newChan
runUniPipe :: UniPipeChan -> IO ThreadId
runUniPipe UniPipeChan{..} =
forkIO $
forever $
readChan getReadUniPipe >>= writeChan getWriteUniPipe
----------------------------------------------------------------
-- | Represent a bidirectional pipe with 2 nodes A and B
data PipeChan = PipeChan
{ fromC :: IORef ByteString
, fromS :: IORef ByteString
, c2s :: UniPipeChan
, s2c :: UniPipeChan
}
newPipe :: IO PipeChan
newPipe =
PipeChan
<$> newIORef B.empty
<*> newIORef B.empty
<*> newUniPipeChan
<*> newUniPipeChan
runPipe :: PipeChan -> IO (ThreadId, ThreadId)
runPipe PipeChan{..} = (,) <$> runUniPipe c2s <*> runUniPipe s2c
readPipeC :: PipeChan -> Int -> IO ByteString
readPipeC PipeChan{..} sz = readBuffered fromS (getWriteUniPipe s2c) sz
writePipeC :: PipeChan -> ByteString -> IO ()
writePipeC PipeChan{..} = writeChan $ getWriteUniPipe c2s
readPipeS :: PipeChan -> Int -> IO ByteString
readPipeS PipeChan{..} sz = readBuffered fromC (getWriteUniPipe c2s) sz
writePipeS :: PipeChan -> ByteString -> IO ()
writePipeS PipeChan{..} = writeChan $ getReadUniPipe s2c
-- helper to read buffered data.
readBuffered :: IORef ByteString -> Chan ByteString -> Int -> IO ByteString
readBuffered ref chan sz = do
left <- readIORef ref
if B.length left >= sz
then do
let (ret, nleft) = B.splitAt sz left
writeIORef ref nleft
return ret
else do
let newSize = sz - B.length left
newData <- readChan chan
writeIORef ref newData
remain <- readBuffered ref chan newSize
return (left `B.append` remain)
|