File: PipeChan.hs

package info (click to toggle)
haskell-tls 2.1.8-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,056 kB
  • sloc: haskell: 15,695; makefile: 3
file content (85 lines) | stat: -rw-r--r-- 2,423 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
{-# LANGUAGE RecordWildCards #-}

-- create a similar concept than a unix pipe.
module PipeChan (
    PipeChan (..),
    newPipe,
    runPipe,
    readPipeC,
    readPipeS,
    writePipeC,
    writePipeS,
) where

import Control.Concurrent
import Control.Monad (forever)
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.IORef

----------------------------------------------------------------

-- | represent a unidirectional pipe with a buffered read channel and
-- a write channel
data UniPipeChan = UniPipeChan
    { getReadUniPipe :: Chan ByteString
    , getWriteUniPipe :: Chan ByteString
    }

newUniPipeChan :: IO UniPipeChan
newUniPipeChan = UniPipeChan <$> newChan <*> newChan

runUniPipe :: UniPipeChan -> IO ThreadId
runUniPipe UniPipeChan{..} =
    forkIO $
        forever $
            readChan getReadUniPipe >>= writeChan getWriteUniPipe

----------------------------------------------------------------

-- | Represent a bidirectional pipe with 2 nodes A and B
data PipeChan = PipeChan
    { fromC :: IORef ByteString
    , fromS :: IORef ByteString
    , c2s :: UniPipeChan
    , s2c :: UniPipeChan
    }

newPipe :: IO PipeChan
newPipe =
    PipeChan
        <$> newIORef B.empty
        <*> newIORef B.empty
        <*> newUniPipeChan
        <*> newUniPipeChan

runPipe :: PipeChan -> IO (ThreadId, ThreadId)
runPipe PipeChan{..} = (,) <$> runUniPipe c2s <*> runUniPipe s2c

readPipeC :: PipeChan -> Int -> IO ByteString
readPipeC PipeChan{..} sz = readBuffered fromS (getWriteUniPipe s2c) sz

writePipeC :: PipeChan -> ByteString -> IO ()
writePipeC PipeChan{..} = writeChan $ getWriteUniPipe c2s

readPipeS :: PipeChan -> Int -> IO ByteString
readPipeS PipeChan{..} sz = readBuffered fromC (getWriteUniPipe c2s) sz

writePipeS :: PipeChan -> ByteString -> IO ()
writePipeS PipeChan{..} = writeChan $ getReadUniPipe s2c

-- helper to read buffered data.
readBuffered :: IORef ByteString -> Chan ByteString -> Int -> IO ByteString
readBuffered ref chan sz = do
    left <- readIORef ref
    if B.length left >= sz
        then do
            let (ret, nleft) = B.splitAt sz left
            writeIORef ref nleft
            return ret
        else do
            let newSize = sz - B.length left
            newData <- readChan chan
            writeIORef ref newData
            remain <- readBuffered ref chan newSize
            return (left `B.append` remain)