File: Zlib.hs

package info (click to toggle)
haskell-conduit-extra 1.3.8-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 288 kB
  • sloc: haskell: 2,601; makefile: 3
file content (237 lines) | stat: -rw-r--r-- 8,952 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
-- | Streaming compression and decompression using conduits.
--
-- Parts of this code were taken from zlib-enum and adapted for conduits.
module Data.Conduit.Zlib (
    -- * Conduits
    compress, decompress, gzip, ungzip,
    -- * Flushing
    compressFlush, decompressFlush,
    -- * Decompression combinators
    multiple,
    -- * Re-exported from zlib-bindings
    WindowBits (..), defaultWindowBits
) where

import Data.Streaming.Zlib
import Data.Conduit
import Data.ByteString (ByteString)
import qualified Data.ByteString as S
import Control.Monad (unless, liftM)
import Control.Monad.Trans.Class (lift, MonadTrans)
import Control.Monad.Primitive (PrimMonad, unsafePrimToPrim)
import Control.Monad.Trans.Resource (MonadThrow, throwM)
import Data.Function (fix)

-- | Gzip compression with default parameters.
gzip :: (MonadThrow m, PrimMonad m) => ConduitT ByteString ByteString m ()
gzip = compress (-1) (WindowBits 31)

-- | Gzip decompression with default parameters.
ungzip :: (PrimMonad m, MonadThrow m) => ConduitT ByteString ByteString m ()
ungzip = decompress (WindowBits 31)

unsafeLiftIO :: (PrimMonad m, MonadThrow m) => IO a -> m a
unsafeLiftIO = unsafePrimToPrim

-- |
-- Decompress (inflate) a stream of 'ByteString's. For example:
--
-- >    sourceFile "test.z" $= decompress defaultWindowBits $$ sinkFile "test"

decompress
    :: (PrimMonad m, MonadThrow m)
    => WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
    -> ConduitT ByteString ByteString m ()
decompress =
    helperDecompress (liftM (fmap Chunk) await) yield' leftover
  where
    yield' Flush = return ()
    yield' (Chunk bs) = yield bs

-- | Same as 'decompress', but allows you to explicitly flush the stream.
decompressFlush
    :: (PrimMonad m, MonadThrow m)
    => WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
    -> ConduitT (Flush ByteString) (Flush ByteString) m ()
decompressFlush = helperDecompress await yield (leftover . Chunk)

helperDecompress :: (Monad (t m), PrimMonad m, MonadThrow m, MonadTrans t)
                 => t m (Maybe (Flush ByteString))
                 -> (Flush ByteString -> t m ())
                 -> (ByteString -> t m ())
                 -> WindowBits
                 -> t m ()
helperDecompress await' yield' leftover' config = do
    -- Initialize the stateful inflater, which will be used below
    -- This inflater is never exposed outside of this function
    inf <- lift $ unsafeLiftIO $ initInflate config

    -- Some helper functions used by the main feeder loop below

    let -- Flush any remaining inflated bytes downstream
        flush = do
            chunk <- lift $ unsafeLiftIO $ flushInflate inf
            unless (S.null chunk) $ yield' $ Chunk chunk

        -- Get any input which is unused by the inflater
        getUnused = lift $ unsafeLiftIO $ getUnusedInflate inf

        -- If there is any unused data, return it as leftovers to the stream
        unused = do
            rem' <- getUnused
            unless (S.null rem') $ leftover' rem'

    -- Main loop: feed data from upstream into the inflater
    fix $ \feeder -> do
        mnext <- await'
        case mnext of
            -- No more data is available from upstream
            Nothing -> do
                -- Flush any remaining uncompressed data
                flush
                -- Return the rest of the unconsumed data as leftovers
                unused
            -- Another chunk of compressed data arrived
            Just (Chunk x) -> do
                -- Feed the compressed data into the inflater, returning a
                -- "popper" which will return chunks of decompressed data
                popper <- lift $ unsafeLiftIO $ feedInflate inf x

                -- Loop over the popper grabbing decompressed chunks and
                -- yielding them downstream
                fix $ \pop -> do
                    mbs <- lift $ unsafeLiftIO popper
                    case mbs of
                        -- No more data from this popper
                        PRDone -> do
                            rem' <- getUnused
                            if S.null rem'
                                -- No data was unused by the inflater, so let's
                                -- fill it up again and get more data out of it
                                then feeder
                                -- In this case, there is some unconsumed data,
                                -- meaning the compressed stream is complete.
                                -- At this point, we need to stop feeding,
                                -- return the unconsumed data as leftovers, and
                                -- flush any remaining content (which should be
                                -- nothing)
                                else do
                                    flush
                                    leftover' rem'
                        -- Another chunk available, yield it downstream and
                        -- loop again
                        PRNext bs -> do
                            yield' (Chunk bs)
                            pop
                        -- An error occurred inside zlib, throw it
                        PRError e -> lift $ throwM e
            -- We've been asked to flush the stream
            Just Flush -> do
                -- Get any uncompressed data waiting for us
                flush
                -- Put a Flush in the stream
                yield' Flush
                -- Feed in more data
                feeder

-- |
-- Compress (deflate) a stream of 'ByteString's. The 'WindowBits' also control
-- the format (zlib vs. gzip).

compress
    :: (PrimMonad m, MonadThrow m)
    => Int         -- ^ Compression level
    -> WindowBits  -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
    -> ConduitT ByteString ByteString m ()
compress =
    helperCompress (liftM (fmap Chunk) await) yield'
  where
    yield' Flush = return ()
    yield' (Chunk bs) = yield bs

-- | Same as 'compress', but allows you to explicitly flush the stream.
compressFlush
    :: (PrimMonad m, MonadThrow m)
    => Int         -- ^ Compression level
    -> WindowBits  -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library)
    -> ConduitT (Flush ByteString) (Flush ByteString) m ()
compressFlush = helperCompress await yield

helperCompress :: (Monad (t m), PrimMonad m, MonadThrow m, MonadTrans t)
               => t m (Maybe (Flush ByteString))
               -> (Flush ByteString -> t m ())
               -> Int
               -> WindowBits
               -> t m ()
helperCompress await' yield' level config =
    await' >>= maybe (return ()) start
  where
    start input = do
        def <- lift $ unsafeLiftIO $ initDeflate level config
        push def input

    continue def = await' >>= maybe (close def) (push def)

    goPopper popper = do
        mbs <- lift $ unsafeLiftIO popper
        case mbs of
            PRDone -> return ()
            PRNext bs -> yield' (Chunk bs) >> goPopper popper
            PRError e -> lift $ throwM e

    push def (Chunk x) = do
        popper <- lift $ unsafeLiftIO $ feedDeflate def x
        goPopper popper
        continue def

    push def Flush = do
        mchunk <- lift $ unsafeLiftIO $ flushDeflate def
        case mchunk of
            PRDone -> return ()
            PRNext x -> yield' $ Chunk x
            PRError e -> lift $ throwM e
        yield' Flush
        continue def

    close def = do
        mchunk <- lift $ unsafeLiftIO $ finishDeflate def
        case mchunk of
            PRDone -> return ()
            PRNext chunk -> yield' (Chunk chunk) >> close def
            PRError e -> lift $ throwM e

-- | The standard 'decompress' and 'ungzip' functions will only decompress a
-- single compressed entity from the stream. This combinator will exhaust the
-- stream completely of all individual compressed entities. This is useful for
-- cases where you have a concatenated archive, e.g. @cat file1.gz file2.gz >
-- combined.gz@.
--
-- Usage:
--
-- > sourceFile "combined.gz" $$ multiple ungzip =$ consume
--
-- This combinator will not fail on an empty stream. If you want to ensure that
-- at least one compressed entity in the stream exists, consider a usage such
-- as:
--
-- > sourceFile "combined.gz" $$ (ungzip >> multiple ungzip) =$ consume
--
-- @since 1.1.10
multiple :: Monad m
         => ConduitT ByteString a m ()
         -> ConduitT ByteString a m ()
multiple inner =
    loop
  where
    loop = do
        mbs <- await
        case mbs of
            Nothing -> return ()
            Just bs
                | S.null bs -> loop
                | otherwise -> do
                    leftover bs
                    inner
                    loop