File: Streaming.hs

package info (click to toggle)
haskell-zstd 0.1.3.0-3
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,988 kB
  • sloc: ansic: 26,812; haskell: 1,085; makefile: 4
file content (197 lines) | stat: -rw-r--r-- 6,996 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
-- Copyright (c) 2016-present, Facebook, Inc.
-- All rights reserved.
--
-- This source code is licensed under the BSD-style license found in
-- the LICENSE file in the root directory of this source tree. An
-- additional grant of patent rights can be found in the PATENTS file
-- in the same directory.

{-# LANGUAGE MultiWayIf #-}

-- |
-- Module      : Codec.Compression.Zstd.Streaming
-- Copyright   : (c) 2016-present, Facebook, Inc. All rights reserved.
--
-- License     : BSD3
-- Maintainer  : bryano@fb.com
-- Stability   : experimental
-- Portability : GHC
--
-- Streaming compression and decompression support for zstd.

module Codec.Compression.Zstd.Streaming
    (
      Result(..)
    , compress
    , decompress
    , maxCLevel
    ) where

import Codec.Compression.Zstd.FFI hiding (compress, decompress)
import Codec.Compression.Zstd.FFI.Types (peekPos)
import qualified Data.ByteString as B
import Data.ByteString.Internal (ByteString(..), mallocByteString)
import Foreign.Marshal.Alloc (finalizerFree, malloc)
import Foreign.C.Types (CSize)
import Foreign.ForeignPtr
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Storable (poke)
import Foreign.Ptr (Ptr, plusPtr)
import Data.Word (Word8)

-- | The result of a streaming compression or decompression step.
data Result
  = Produce ByteString (IO Result)
    -- ^ A single frame of transformed data, and an action that when
    -- executed will yield the next step in the streaming operation.
    -- The action is ephemeral; you should discard it as soon as you
    -- use it.
  | Consume (ByteString -> IO Result)
    -- ^ Provide the function with more input for the streaming
    -- operation to continue.  This function is ephemeral. You should
    -- call it exactly once, and discard it immediately after you call
    -- it.
    --
    -- To signal the end of a stream of data, supply an 'B.empty'
    -- input.
  | Error String String
    -- ^ An error has occurred. If an error occurs, the streaming
    -- operation cannot continue.
  | Done ByteString
    -- ^ The streaming operation has ended.  This payload may be
    -- empty. If it is not, it must be written out.
    --
    -- A non-empty payload consists of a frame epilogue, possibly
    -- preceded by any data left over from the final streaming step.

instance Show Result where
    show (Produce bs _) = "Produce " ++ show bs ++ " _"
    show (Consume _)    = "Consume _"
    show (Error n d)    = "Error " ++ show n ++ " " ++ show d
    show (Done bs)      = "Done " ++ show bs

-- | Begin a streaming compression operation.
--
-- The initial result will be either an 'Error' or a 'Consume'.
compress :: Int
         -- ^ Compression level. Must be >= 1 and <= 'maxCLevel'.
         -> IO Result
compress level
  | level < 1 || level > maxCLevel =
    return (Error "compress" "unsupported compression level")
  | otherwise =
  streaming
  createCStream
  p_freeCStream
  outSize
  (\cs -> initCStream cs (fromIntegral level))
  compressStream
  finish
 where
  outSize = fromIntegral cstreamOutSize
  finish cfp obfp opos dfp = do
    let cptr = unsafeForeignPtrToPtr cfp
        obuf = unsafeForeignPtrToPtr obfp
    check "endStream" (endStream cptr obuf) $ \leftover -> do
      touchForeignPtr cfp
      touchForeignPtr obfp
      if | leftover <= 0 -> do -- leftover will never be <0, but compiler does not know that
             opos1 <- fromIntegral `fmap` peekPos obuf
             Done `fmap` shrink outSize dfp opos1
         | leftover > 0 -> do
             opos1 <- fromIntegral `fmap` peekPos obuf
             dfp1 <- mallocByteString (fromIntegral leftover)
             poke obuf (buffer (unsafeForeignPtrToPtr dfp1) leftover)
             touchForeignPtr obfp
             bs <- shrink outSize dfp opos1
             return (Produce bs (finish cfp obfp 0 dfp1))

type ConsumeBlock ctx io = Ptr ctx -> Ptr (Buffer Out)
                         -> Ptr (Buffer In) -> IO CSize

type Finish ctx io = ForeignPtr ctx -> ForeignPtr (Buffer Out)
                   -> Int -> ForeignPtr Word8 -> IO Result

streaming :: IO (Ptr ctx)
          -> FinalizerPtr ctx
          -> Int
          -> (Ptr ctx -> IO CSize)
          -> ConsumeBlock ctx io
          -> Finish ctx io
          -> IO Result
streaming createStream freeStream outSize initStream consumeBlock finish = do
  cx <- checkAlloc "createStream" createStream
  cxfp <- newForeignPtr freeStream cx
  check "initStream" (initStream cx) $ \_ -> do
    ibfp <- newForeignPtr finalizerFree =<< malloc
    obfp <- newForeignPtr finalizerFree =<< malloc
    dfp <- newOutput obfp
    advanceInput cxfp ibfp obfp 0 dfp
 where
  advanceInput cxfp ibfp obfp opos dfp = do
    let prompt (PS fp off len)
          | len == 0 = finish cxfp obfp opos dfp
          | otherwise = do
              withForeignPtr fp $ \sp0 ->
                withForeignPtr ibfp $ \ibuf ->
                  poke ibuf (buffer (sp0 `plusPtr` off) (fromIntegral len))
              consume cxfp ibfp 0 len obfp 0 dfp fp
    return (Consume prompt)
  newOutput obfp = do
    dfp <- mallocByteString outSize
    withForeignPtr dfp $ \dp ->
      withForeignPtr obfp $ \obuf ->
        poke obuf (buffer dp (fromIntegral outSize))
    return dfp
  consume cxfp ibfp ipos ilen obfp opos dfp fp = do
    if | fromIntegral ipos == ilen -> advanceInput cxfp ibfp obfp opos dfp
       | opos == outSize -> do
           let go = do
                 ndfp <- newOutput obfp
                 consume cxfp ibfp ipos ilen obfp 0 ndfp fp
           return (Produce (PS dfp 0 opos) go)
       | otherwise -> do
           let obuf = unsafeForeignPtrToPtr obfp
               ibuf = unsafeForeignPtrToPtr ibfp
           check "consumeBlock"
             (withForeignPtr cxfp $ \cptr ->
               consumeBlock cptr obuf ibuf <* touchForeignPtr fp) $ \_ -> do
             opos1 <- fromIntegral `fmap` peekPos obuf
             ipos1 <- peekPos ibuf
             touchForeignPtr obfp
             touchForeignPtr ibfp
             consume cxfp ibfp ipos1 ilen obfp opos1 dfp fp

-- | Begin a streaming decompression operation.
--
-- The initial result will be either an 'Error' or a 'Consume'.
decompress :: IO Result
decompress =
  streaming
  createDStream
  p_freeDStream
  outSize
  initDStream
  decompressStream
  finish
 where
  outSize = fromIntegral dstreamOutSize
  finish _cxfp _obfp opos dfp = Done `fmap` shrink outSize dfp opos

shrink :: Int -> ForeignPtr Word8 -> Int -> IO B.ByteString
shrink capacity dfp opos
  | opos == 0  = return B.empty
  | let unused = capacity - opos
    in unused >= 1024 || unused > capacity `rem` 8
               = return (B.copy (PS dfp 0 opos))
  | otherwise  = return (PS dfp 0 opos)

buffer :: Ptr a -> CSize -> Buffer io
buffer ptr size = Buffer ptr size 0

check :: String -> IO CSize -> (CSize -> IO Result) -> IO Result
check name act onSuccess = do
  ret <- act
  if isError ret
    then return (Error name (getErrorName ret))
    else onSuccess ret