File: Process.hs

package info (click to toggle)
haskell-conduit-extra 1.3.8-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 288 kB
  • sloc: haskell: 2,601; makefile: 3
file content (208 lines) | stat: -rw-r--r-- 8,544 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- | A full tutorial for this module is available at:
-- <https://github.com/snoyberg/conduit/blob/master/PROCESS.md>.
--
-- Some utilities in this module require the threaded runtime because they use
-- 'System.Process.waitForProcess' internally.
--
-- Note that this is a very thin layer around the @Data.Streaming.Process@ module. In particular, it:
--
-- * Provides orphan instances for conduit
--
-- * Provides some useful helper functions
module Data.Conduit.Process
    ( -- * Functions
      sourceCmdWithConsumer
    , sourceProcessWithConsumer
    , sourceCmdWithStreams
    , sourceProcessWithStreams
    , withCheckedProcessCleanup
      -- * InputSource types
    , FlushInput(..)
    , BuilderInput(..)
      -- * Reexport
    , module Data.Streaming.Process
    ) where

import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket, catch)
import System.IO.Error (ioeGetErrorType, isResourceVanishedErrorType)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif

instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
    isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sinkHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
    isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sinkHandle h, liftIO $ hClose h), Just CreatePipe)

-- | Wrapper for input source which accepts 'Data.ByteString.Builder.Builder's.
-- You can pass 'Data.ByteString.Builder.Extra.flush' to flush the input. Note
-- that the pipe will /not/ automatically close when the processing completes.
--
-- @since 1.3.2
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)

-- | Wrapper for input source  which accepts @Flush@es. Note that the pipe
-- will /not/ automatically close then processing completes.
--
-- @since 1.3.2
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)

instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
  isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
  isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
  isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
  isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)

instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
    osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sourceHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
    osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sourceHandle h, liftIO $ hClose h), Just CreatePipe)

-- | Given a @CreateProcess@, run the process, with its output being used as a
-- @Source@ to feed the provided @Consumer@. Once the process has completed,
-- return a tuple of the @ExitCode@ from the process and the output collected
-- from the @Consumer@.
--
-- Note that, if an exception is raised by the consumer, the process is /not/
-- terminated. This behavior is different from 'sourceProcessWithStreams' due
-- to historical reasons.
--
-- Requires the threaded runtime.
--
-- Since 1.1.2
sourceProcessWithConsumer :: MonadIO m
                          => CreateProcess
                          -> ConduitT ByteString Void m a -- ^ stdout
                          -> m (ExitCode, a)
sourceProcessWithConsumer cp consumer = do
    (ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
    res <- runConduit $ source .| consumer
    close
    ec <- waitForStreamingProcess cph
    return (ec, res)

-- | Like @sourceProcessWithConsumer@ but providing the command to be run as
-- a @String@.
--
-- Requires the threaded runtime.
--
-- Since 1.1.2
sourceCmdWithConsumer :: MonadIO m
                      => String                  -- ^command
                      -> ConduitT ByteString Void m a -- ^stdout
                      -> m (ExitCode, a)
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)


-- | Given a @CreateProcess@, run the process
-- and feed the provided @Producer@
-- to the stdin @Sink@ of the process.
-- Use the process outputs (stdout, stderr) as @Source@s
-- and feed it to the provided @Consumer@s.
-- Once the process has completed,
-- return a tuple of the @ExitCode@ from the process
-- and the results collected from the @Consumer@s.
--
-- If an exception is raised by any of the streams,
-- the process is terminated.
--
-- IO is required because the streams are run concurrently
-- using the <https://hackage.haskell.org/package/async async> package
--
-- Requires the threaded runtime.
--
-- @since 1.1.12
sourceProcessWithStreams
  :: MonadUnliftIO m
  => CreateProcess
  -> ConduitT () ByteString m () -- ^stdin
  -> ConduitT ByteString Void m a -- ^stdout
  -> ConduitT ByteString Void m b -- ^stderr
  -> m (ExitCode, a, b)
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
  withUnliftIO $ \u -> do
    (  (sinkStdin, closeStdin)
     , (sourceStdout, closeStdout)
     , (sourceStderr, closeStderr)
     , sph) <- streamingProcess cp
    let safeSinkStdin = sinkStdin `catchC` ignoreStdinClosed
        safeCloseStdin = closeStdin `catch` ignoreStdinClosed
    (_, resStdout, resStderr) <-
      runConcurrently (
        (,,)
        <$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| safeSinkStdin) `finally` safeCloseStdin)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
      `finally` (closeStdout >> closeStderr)
      `onException` terminateStreamingProcess sph
    ec <- waitForStreamingProcess sph
    return (ec, resStdout, resStderr)
  where
    ignoreStdinClosed :: forall m. (MonadIO m) => IOError -> m ()
    ignoreStdinClosed e =
      if isResourceVanishedErrorType (ioeGetErrorType e)
        then pure ()
        else liftIO (throwIO e)

-- | Like @sourceProcessWithStreams@ but providing the command to be run as
-- a @String@.
--
-- Requires the threaded runtime.
--
-- @since 1.1.12
sourceCmdWithStreams
  :: MonadUnliftIO m
  => String                   -- ^command
  -> ConduitT () ByteString m () -- ^stdin
  -> ConduitT ByteString Void m a -- ^stdout
  -> ConduitT ByteString Void m b -- ^stderr
  -> m (ExitCode, a, b)
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)

-- | Same as 'withCheckedProcess', but kills the child process in the case of
-- an exception being thrown by the provided callback function.
--
-- Requires the threaded runtime.
--
-- @since 1.1.11
withCheckedProcessCleanup
    :: ( InputSource stdin
       , OutputSink stderr
       , OutputSink stdout
       , MonadUnliftIO m
       )
    => CreateProcess
    -> (stdin -> stdout -> stderr -> m b)
    -> m b
withCheckedProcessCleanup cp f = withRunInIO $ \run -> bracket
    (streamingProcess cp)
    (\(_, _, _, sph) -> closeStreamingProcessHandle sph)
    $ \(x, y, z, sph) -> do
        res <- run (f x y z) `onException` terminateStreamingProcess sph
        ec <- waitForStreamingProcess sph
        if ec == ExitSuccess
            then return res
            else throwIO $ ProcessExitedUnsuccessfully cp ec


terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw