1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
|
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- | A full tutorial for this module is available at:
-- <https://github.com/snoyberg/conduit/blob/master/PROCESS.md>.
--
-- Some utilities in this module require the threaded runtime because they use
-- 'System.Process.waitForProcess' internally.
--
-- Note that this is a very thin layer around the @Data.Streaming.Process@ module. In particular, it:
--
-- * Provides orphan instances for conduit
--
-- * Provides some useful helper functions
module Data.Conduit.Process
( -- * Functions
sourceCmdWithConsumer
, sourceProcessWithConsumer
, sourceCmdWithStreams
, sourceProcessWithStreams
, withCheckedProcessCleanup
-- * InputSource types
, FlushInput(..)
, BuilderInput(..)
-- * Reexport
, module Data.Streaming.Process
) where
import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket, catch)
import System.IO.Error (ioeGetErrorType, isResourceVanishedErrorType)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif
instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sinkHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sinkHandle h, liftIO $ hClose h), Just CreatePipe)
-- | Wrapper for input source which accepts 'Data.ByteString.Builder.Builder's.
-- You can pass 'Data.ByteString.Builder.Extra.flush' to flush the input. Note
-- that the pipe will /not/ automatically close when the processing completes.
--
-- @since 1.3.2
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)
-- | Wrapper for input source which accepts @Flush@es. Note that the pipe
-- will /not/ automatically close then processing completes.
--
-- @since 1.3.2
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)
instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)
instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sourceHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sourceHandle h, liftIO $ hClose h), Just CreatePipe)
-- | Given a @CreateProcess@, run the process, with its output being used as a
-- @Source@ to feed the provided @Consumer@. Once the process has completed,
-- return a tuple of the @ExitCode@ from the process and the output collected
-- from the @Consumer@.
--
-- Note that, if an exception is raised by the consumer, the process is /not/
-- terminated. This behavior is different from 'sourceProcessWithStreams' due
-- to historical reasons.
--
-- Requires the threaded runtime.
--
-- Since 1.1.2
sourceProcessWithConsumer :: MonadIO m
=> CreateProcess
-> ConduitT ByteString Void m a -- ^ stdout
-> m (ExitCode, a)
sourceProcessWithConsumer cp consumer = do
(ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
res <- runConduit $ source .| consumer
close
ec <- waitForStreamingProcess cph
return (ec, res)
-- | Like @sourceProcessWithConsumer@ but providing the command to be run as
-- a @String@.
--
-- Requires the threaded runtime.
--
-- Since 1.1.2
sourceCmdWithConsumer :: MonadIO m
=> String -- ^command
-> ConduitT ByteString Void m a -- ^stdout
-> m (ExitCode, a)
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)
-- | Given a @CreateProcess@, run the process
-- and feed the provided @Producer@
-- to the stdin @Sink@ of the process.
-- Use the process outputs (stdout, stderr) as @Source@s
-- and feed it to the provided @Consumer@s.
-- Once the process has completed,
-- return a tuple of the @ExitCode@ from the process
-- and the results collected from the @Consumer@s.
--
-- If an exception is raised by any of the streams,
-- the process is terminated.
--
-- IO is required because the streams are run concurrently
-- using the <https://hackage.haskell.org/package/async async> package
--
-- Requires the threaded runtime.
--
-- @since 1.1.12
sourceProcessWithStreams
:: MonadUnliftIO m
=> CreateProcess
-> ConduitT () ByteString m () -- ^stdin
-> ConduitT ByteString Void m a -- ^stdout
-> ConduitT ByteString Void m b -- ^stderr
-> m (ExitCode, a, b)
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
withUnliftIO $ \u -> do
( (sinkStdin, closeStdin)
, (sourceStdout, closeStdout)
, (sourceStderr, closeStderr)
, sph) <- streamingProcess cp
let safeSinkStdin = sinkStdin `catchC` ignoreStdinClosed
safeCloseStdin = closeStdin `catch` ignoreStdinClosed
(_, resStdout, resStderr) <-
runConcurrently (
(,,)
<$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| safeSinkStdin) `finally` safeCloseStdin)
<*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
<*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
`finally` (closeStdout >> closeStderr)
`onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
return (ec, resStdout, resStderr)
where
ignoreStdinClosed :: forall m. (MonadIO m) => IOError -> m ()
ignoreStdinClosed e =
if isResourceVanishedErrorType (ioeGetErrorType e)
then pure ()
else liftIO (throwIO e)
-- | Like @sourceProcessWithStreams@ but providing the command to be run as
-- a @String@.
--
-- Requires the threaded runtime.
--
-- @since 1.1.12
sourceCmdWithStreams
:: MonadUnliftIO m
=> String -- ^command
-> ConduitT () ByteString m () -- ^stdin
-> ConduitT ByteString Void m a -- ^stdout
-> ConduitT ByteString Void m b -- ^stderr
-> m (ExitCode, a, b)
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)
-- | Same as 'withCheckedProcess', but kills the child process in the case of
-- an exception being thrown by the provided callback function.
--
-- Requires the threaded runtime.
--
-- @since 1.1.11
withCheckedProcessCleanup
:: ( InputSource stdin
, OutputSink stderr
, OutputSink stdout
, MonadUnliftIO m
)
=> CreateProcess
-> (stdin -> stdout -> stderr -> m b)
-> m b
withCheckedProcessCleanup cp f = withRunInIO $ \run -> bracket
(streamingProcess cp)
(\(_, _, _, sph) -> closeStreamingProcessHandle sph)
$ \(x, y, z, sph) -> do
res <- run (f x y z) `onException` terminateStreamingProcess sph
ec <- waitForStreamingProcess sph
if ec == ExitSuccess
then return res
else throwIO $ ProcessExitedUnsuccessfully cp ec
terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw
|