1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
|
-- Module : Foundation.Conduit.Internal
-- License : BSD-style
-- Maintainer : Foundation
-- Stability : experimental
-- Portability : portable
--
-- Taken from the conduit package almost verbatim, and
-- Copyright (c) 2012 Michael Snoyman
--
{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE OverloadedStrings #-}
{-# OPTIONS_GHC -Wno-inline-rule-shadowing #-}
module Foundation.Conduit.Internal
( Pipe(..)
, Conduit(..)
, ZipSink(..)
, ResourceT(..)
, MonadResource(..)
, runResourceT
, await
, awaitForever
, yield
, yieldOr
, leftover
, runConduit
, runConduitRes
, runConduitPure
, fuse
, bracketConduit
) where
import Basement.Imports hiding (throw)
import Foundation.Monad
import Foundation.Numerical
import Basement.Monad
import Control.Monad ((>=>), liftM, void, mapM_, join)
import Control.Exception (SomeException, mask_)
import Data.IORef (atomicModifyIORef)
-- | A pipe producing and consuming values
--
-- A basic intuition is that every @Pipe@ produces a stream of /output/ values
-- and eventually indicates that this stream is terminated by sending a
-- /result/. On the receiving end of a @Pipe@, these become the /input/ and /upstream/
-- parameters.
data Pipe leftOver input output upstream monad result =
-- | Provide new output to be sent downstream. This constructor has three
-- fields: the next @Pipe@ to be used, a finalization function, and the
-- output value.
Yield (Pipe leftOver input output upstream monad result) (monad ()) output
-- | Request more input from upstream. The first field takes a new input
-- value and provides a new @Pipe@. The second takes an upstream result
-- value, which indicates that upstream is producing no more results.
| Await (input -> Pipe leftOver input output upstream monad result)
(upstream -> Pipe leftOver input output upstream monad result)
-- | Processing with this @Pipe@ is complete, providing the final result.
| Done result
-- | Require running of a monadic action to get the next @Pipe@.
| PipeM (monad (Pipe leftOver input output upstream monad result))
-- | Return leftover input, which should be provided to future operations.
| Leftover (Pipe leftOver input output upstream monad result) leftOver
instance Applicative m => Functor (Pipe l i o u m) where
fmap = (<$>)
{-# INLINE fmap #-}
instance Applicative m => Applicative (Pipe l i o u m) where
pure = Done
{-# INLINE pure #-}
Yield p c o <*> fa = Yield (p <*> fa) c o
Await p c <*> fa = Await (\i -> p i <*> fa) (\o -> c o <*> fa)
Done r <*> fa = r <$> fa
PipeM mp <*> fa = PipeM ((<*> fa) <$> mp)
Leftover p i <*> fa = Leftover (p <*> fa) i
{-# INLINE (<*>) #-}
instance (Functor m, Monad m) => Monad (Pipe l i o u m) where
return = pure
{-# INLINE return #-}
Yield p c o >>= fp = Yield (p >>= fp) c o
Await p c >>= fp = Await (p >=> fp) (c >=> fp)
Done x >>= fp = fp x
PipeM mp >>= fp = PipeM ((>>= fp) <$> mp)
Leftover p i >>= fp = Leftover (p >>= fp) i
-- | A component of a conduit pipeline, which takes a stream of
-- @input@, produces a stream of @output@, performs actions in the
-- underlying @monad@, and produces a value of @result@ when no more
-- output data is available.
newtype Conduit input output monad result = Conduit
{ unConduit :: forall a . (result -> Pipe input input output () monad a) -> Pipe input input output () monad a
}
instance Functor (Conduit i o m) where
fmap f (Conduit c) = Conduit $ \resPipe -> c (resPipe . f)
instance Applicative (Conduit i o m) where
pure x = Conduit ($ x)
{-# INLINE pure #-}
fab <*> fa = fab >>= \ab -> fa >>= \a -> pure (ab a)
{-# INLINE (<*>) #-}
instance Monad (Conduit i o m) where
return = pure
Conduit f >>= g = Conduit $ \h -> f $ \a -> unConduit (g a) h
instance MonadTrans (Conduit i o) where
lift m = Conduit $ \rest -> PipeM $ liftM rest m
instance MonadIO m => MonadIO (Conduit i o m) where
liftIO = lift . liftIO
instance MonadFailure m => MonadFailure (Conduit i o m) where
type Failure (Conduit i o m) = Failure m
mFail = lift . mFail
instance MonadThrow m => MonadThrow (Conduit i o m) where
throw = lift . throw
instance MonadCatch m => MonadCatch (Conduit i o m) where
catch (Conduit c0) onExc = Conduit $ \rest -> let
go (PipeM m) =
PipeM $ catch (liftM go m) (\x -> return $ unConduit (onExc x) rest)
go (Done r) = rest r
go (Await p c) = Await (go . p) (go . c)
go (Yield p m o) = Yield (go p) m o
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
-- | Await for a value from upstream.
await :: Conduit i o m (Maybe i)
await = Conduit $ \f -> Await (f . Just) (const (f Nothing))
{-# NOINLINE[1] await #-}
await' :: Conduit i o m r
-> (i -> Conduit i o m r)
-> Conduit i o m r
await' f g = Conduit $ \rest -> Await
(\i -> unConduit (g i) rest)
(const $ unConduit f rest)
{-# INLINE await' #-}
{-# RULES "conduit: await >>= maybe" [2] forall x y. await >>= maybe x y = await' x y #-}
awaitForever :: (input -> Conduit input output monad b) -> Conduit input output monad ()
awaitForever f = Conduit $ \rest ->
let go = Await (\i -> unConduit (f i) (const go)) rest
in go
-- | Send a value downstream.
yield :: Monad m => o -> Conduit i o m ()
yield o = Conduit $ \f -> Yield (f ()) (return ()) o
-- | Same as 'yield', but additionally takes a finalizer to be run if
-- the downstream component terminates.
yieldOr :: o
-> m () -- ^ finalizer
-> Conduit i o m ()
yieldOr o m = Conduit $ \f -> Yield (f ()) m o
-- | Provide leftover input to be consumed by the next component in
-- the current monadic binding.
leftover :: i -> Conduit i o m ()
leftover i = Conduit $ \f -> Leftover (f ()) i
-- | Run a conduit pipeline to completion.
runConduit :: Monad m => Conduit () () m r -> m r
runConduit (Conduit f) = runPipe (f Done)
-- | Run a pure conduit pipeline to completion.
runConduitPure :: Conduit () () Identity r -> r
runConduitPure = runIdentity . runConduit
-- | Run a conduit pipeline in a 'ResourceT' context for acquiring resources.
runConduitRes :: (MonadBracket m, MonadIO m) => Conduit () () (ResourceT m) r -> m r
runConduitRes = runResourceT . runConduit
bracketConduit :: MonadResource m
=> IO a
-> (a -> IO b)
-> (a -> Conduit i o m r)
-> Conduit i o m r
bracketConduit acquire cleanup inner = do
(resource, release) <- allocate acquire cleanup
result <- inner resource
release
return result
-- | Internal: run a @Pipe@
runPipe :: Monad m => Pipe () () () () m r -> m r
runPipe =
go
where
go (Yield p _ ()) = go p
go (Await _ p) = go (p ())
go (Done r) = return r
go (PipeM mp) = mp >>= go
go (Leftover p ()) = go p
-- | Send the output of the first Conduit component to the second
-- Conduit component.
fuse :: Monad m => Conduit a b m () -> Conduit b c m r -> Conduit a c m r
fuse (Conduit left0) (Conduit right0) = Conduit $ \rest ->
let goRight final left right =
case right of
Yield p c o -> Yield (recurse p) (c >> final) o
Await rp rc -> goLeft rp rc final left
Done r2 -> PipeM (final >> return (rest r2))
PipeM mp -> PipeM (liftM recurse mp)
Leftover right' i -> goRight final (Yield left final i) right'
where
recurse = goRight final left
goLeft rp rc final left =
case left of
Yield left' final' o -> goRight final' left' (rp o)
Await left' lc -> Await (recurse . left') (recurse . lc)
Done r1 -> goRight (return ()) (Done r1) (rc r1)
PipeM mp -> PipeM (liftM recurse mp)
Leftover left' i -> Leftover (recurse left') i
where
recurse = goLeft rp rc final
in goRight (return ()) (left0 Done) (right0 Done)
{- FIXME for later, if we add resourcet
-- | Safely acquire a resource and register a cleanup action for it,
-- in the context of a 'Conduit'.
bracketConduit :: MonadResource m
=> IO a -- ^ acquire
-> (a -> IO ()) -- ^ cleanup
-> (a -> Conduit i o m r)
-> Conduit i o m r
bracketConduit alloc cleanup inner = Conduit $ \rest -> PipeM $ do
(key, val) <- allocate alloc cleanup
return $ unConduit (addCleanup (const $ release key) (inside seed)) rest
addCleanup :: Monad m
=> (Bool -> m ())
-> Conduit i o m r
-> Conduit i o m r
addCleanup cleanup (Conduit c0) = Conduit $ \rest -> let
go (Done r) = PipeM (cleanup True >> return (rest r))
go (Yield src close x) = Yield
(go src)
(cleanup False >> close)
x
go (PipeM msrc) = PipeM (liftM (go) msrc)
go (Await p c) = Await
(go . p)
(go . c)
go (Leftover p i) = Leftover (go p) i
in go (c0 Done)
-}
newtype ZipSink i m r = ZipSink { getZipSink :: Conduit i () m r }
instance Monad m => Functor (ZipSink i m) where
fmap f (ZipSink x) = ZipSink (liftM f x)
instance Monad m => Applicative (ZipSink i m) where
pure = ZipSink . return
ZipSink (Conduit f0) <*> ZipSink (Conduit x0) =
ZipSink $ Conduit $ \rest -> let
go (Leftover _ i) _ = absurd i
go _ (Leftover _ i) = absurd i
go (Yield f _ ()) x = go f x
go f (Yield x _ ()) = go f x
go (PipeM mf) x = PipeM (liftM (`go` x) mf)
go f (PipeM mx) = PipeM (liftM (go f) mx)
go (Done f) (Done x) = rest (f x)
go (Await pf cf) (Await px cx) = Await
(\i -> go (pf i) (px i))
(\() -> go (cf ()) (cx ()))
go (Await pf cf) x@Done{} = Await
(\i -> go (pf i) x)
(\() -> go (cf ()) x)
go f@Done{} (Await px cx) = Await
(\i -> go f (px i))
(\() -> go f (cx ()))
in go (injectLeftovers (f0 Done)) (injectLeftovers (x0 Done))
data Void
absurd :: Void -> a
absurd _ = error "Foundation.Conduit.Internal.absurd"
injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
injectLeftovers =
go []
where
go ls (Yield p c o) = Yield (go ls p) c o
go (l:ls) (Await p _) = go ls $ p l
go [] (Await p c) = Await (go [] . p) (go [] . c)
go _ (Done r) = Done r
go ls (PipeM mp) = PipeM (liftM (go ls) mp)
go ls (Leftover p l) = go (l:ls) p
---------------------
-- ResourceT
---------------------
newtype ResourceT m a = ResourceT { unResourceT :: PrimVar IO ReleaseMap -> m a }
instance Functor m => Functor (ResourceT m) where
fmap f (ResourceT m) = ResourceT $ \r -> fmap f (m r)
instance Applicative m => Applicative (ResourceT m) where
pure = ResourceT . const . pure
ResourceT mf <*> ResourceT ma = ResourceT $ \r ->
mf r <*> ma r
instance Monad m => Monad (ResourceT m) where
#if !MIN_VERSION_base(4,8,0)
return = ResourceT . const . return
#endif
ResourceT ma >>= f = ResourceT $ \r -> do
a <- ma r
let ResourceT f' = f a
f' r
instance MonadTrans ResourceT where
lift = ResourceT . const
instance MonadIO m => MonadIO (ResourceT m) where
liftIO = lift . liftIO
instance MonadThrow m => MonadThrow (ResourceT m) where
throw = lift . throw
instance MonadCatch m => MonadCatch (ResourceT m) where
catch (ResourceT f) g = ResourceT $ \env -> f env `catch` \e -> unResourceT (g e) env
instance MonadBracket m => MonadBracket (ResourceT m) where
generalBracket acquire onSuccess onExc inner = ResourceT $ \env -> generalBracket
(unResourceT acquire env)
(\x y -> unResourceT (onSuccess x y) env)
(\x y -> unResourceT (onExc x y) env)
(\x -> unResourceT (inner x) env)
data ReleaseMap =
ReleaseMap !NextKey !RefCount ![(Word, (ReleaseType -> IO ()))] -- FIXME use a proper Map?
| ReleaseMapClosed
data ReleaseType = ReleaseEarly
| ReleaseNormal
| ReleaseException
type RefCount = Word
type NextKey = Word
runResourceT :: (MonadBracket m, MonadIO m) => ResourceT m a -> m a
runResourceT (ResourceT inner) = generalBracket
(liftIO $ primVarNew $ ReleaseMap maxBound (minBound + 1) [])
(\state _res -> liftIO $ cleanup state ReleaseNormal)
(\state _exc -> liftIO $ cleanup state ReleaseException)
inner
where
cleanup istate rtype = do
mm <- atomicModifyIORef istate $ \rm ->
case rm of
ReleaseMap nk rf m ->
let rf' = rf - 1
in if rf' == minBound
then (ReleaseMapClosed, Just m)
else (ReleaseMap nk rf' m, Nothing)
ReleaseMapClosed -> error "runResourceT: cleanup on ReleaseMapClosed"
case mm of
Just m -> mapM_ (\(_, x) -> ignoreExceptions (x rtype)) m
Nothing -> return ()
where
ignoreExceptions io = void io `catch` (\(_ :: SomeException) -> return ())
allocate :: (MonadResource m, MonadIO n) => IO a -> (a -> IO b) -> m (a, n ())
allocate acquire release = liftResourceT $ ResourceT $ \istate -> liftIO $ mask_ $ do
a <- acquire
key <- atomicModifyIORef istate $ \rm ->
case rm of
ReleaseMap key rf m ->
( ReleaseMap (key - 1) rf ((key, const $ void $ release a) : m)
, key
)
ReleaseMapClosed -> error "allocate: ReleaseMapClosed"
let release' = join $ atomicModifyIORef istate $ \rm ->
case rm of
ReleaseMap nextKey rf m ->
let loop front [] = (ReleaseMap nextKey rf (front []), return ())
loop front ((key', action):rest)
| key == key' =
( ReleaseMap nextKey rf (front rest)
, action ReleaseEarly
)
| otherwise = loop (front . ((key', action):)) rest
in loop id m
ReleaseMapClosed -> error "allocate: ReleaseMapClosed (2)"
return (a, liftIO release')
class MonadIO m => MonadResource m where
liftResourceT :: ResourceT IO a -> m a
instance MonadIO m => MonadResource (ResourceT m) where
liftResourceT (ResourceT f) = ResourceT (liftIO . f)
instance MonadResource m => MonadResource (Conduit i o m) where
liftResourceT = lift . liftResourceT
|