1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
|
import unittest
import zstandard as zstd
class TestDecompressor_decompress(unittest.TestCase):
def test_empty_input(self):
dctx = zstd.ZstdDecompressor()
with self.assertRaisesRegex(
zstd.ZstdError, "error determining content size from frame header"
):
dctx.decompress(b"")
def test_invalid_input(self):
dctx = zstd.ZstdDecompressor()
with self.assertRaisesRegex(
zstd.ZstdError, "error determining content size from frame header"
):
dctx.decompress(b"foobar")
def test_input_types(self):
cctx = zstd.ZstdCompressor(level=1)
compressed = cctx.compress(b"foo")
mutable_array = bytearray(len(compressed))
mutable_array[:] = compressed
sources = [
memoryview(compressed),
bytearray(compressed),
mutable_array,
]
dctx = zstd.ZstdDecompressor()
for source in sources:
self.assertEqual(dctx.decompress(source), b"foo")
def test_no_content_size_in_frame(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
compressed = cctx.compress(b"foobar")
dctx = zstd.ZstdDecompressor()
with self.assertRaisesRegex(
zstd.ZstdError, "could not determine content size in frame header"
):
dctx.decompress(compressed)
def test_content_size_present(self):
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(b"foobar")
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(compressed)
self.assertEqual(decompressed, b"foobar")
def test_empty_roundtrip(self):
cctx = zstd.ZstdCompressor()
compressed = cctx.compress(b"")
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress(compressed)
self.assertEqual(decompressed, b"")
def test_max_output_size(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
source = b"foobar" * 256
compressed = cctx.compress(source)
dctx = zstd.ZstdDecompressor()
# Will fit into buffer exactly the size of input.
decompressed = dctx.decompress(compressed, max_output_size=len(source))
self.assertEqual(decompressed, source)
# Input size - 1 fails
with self.assertRaisesRegex(
zstd.ZstdError, "decompression error: did not decompress full frame"
):
dctx.decompress(compressed, max_output_size=len(source) - 1)
# Input size + 1 works
decompressed = dctx.decompress(
compressed, max_output_size=len(source) + 1
)
self.assertEqual(decompressed, source)
# A much larger buffer works.
decompressed = dctx.decompress(
compressed, max_output_size=len(source) * 64
)
self.assertEqual(decompressed, source)
def test_stupidly_large_output_buffer(self):
cctx = zstd.ZstdCompressor(write_content_size=False)
compressed = cctx.compress(b"foobar" * 256)
dctx = zstd.ZstdDecompressor()
# Will get OverflowError on some Python distributions that can't
# handle really large integers.
with self.assertRaises((MemoryError, OverflowError)):
dctx.decompress(compressed, max_output_size=2**62)
def test_dictionary(self):
samples = []
for i in range(128):
samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
samples.append(b"qwert" * 64)
samples.append(b"yuiop" * 64)
samples.append(b"asdfg" * 64)
samples.append(b"hijkl" * 64)
d = zstd.train_dictionary(8192, samples)
orig = b"foobar" * 16384
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
compressed = cctx.compress(orig)
dctx = zstd.ZstdDecompressor(dict_data=d)
decompressed = dctx.decompress(compressed)
self.assertEqual(decompressed, orig)
def test_dictionary_multiple(self):
samples = []
for i in range(128):
samples.append(b"foo" * 64)
samples.append(b"bar" * 64)
samples.append(b"foobar" * 64)
samples.append(b"qwert" * 64)
samples.append(b"yuiop" * 64)
samples.append(b"asdfg" * 64)
samples.append(b"hijkl" * 64)
d = zstd.train_dictionary(8192, samples)
sources = (b"foobar" * 8192, b"foo" * 8192, b"bar" * 8192)
compressed = []
cctx = zstd.ZstdCompressor(level=1, dict_data=d)
for source in sources:
compressed.append(cctx.compress(source))
dctx = zstd.ZstdDecompressor(dict_data=d)
for i in range(len(sources)):
decompressed = dctx.decompress(compressed[i])
self.assertEqual(decompressed, sources[i])
def test_max_window_size(self):
with open(__file__, "rb") as fh:
source = fh.read()
# If we write a content size, the decompressor engages single pass
# mode and the window size doesn't come into play.
cctx = zstd.ZstdCompressor(write_content_size=False)
frame = cctx.compress(source)
dctx = zstd.ZstdDecompressor(max_window_size=2**zstd.WINDOWLOG_MIN)
with self.assertRaisesRegex(
zstd.ZstdError,
"decompression error: Frame requires too much memory",
):
dctx.decompress(frame, max_output_size=len(source))
def test_explicit_default_params(self):
cctx = zstd.ZstdCompressor(level=1)
compressed = cctx.compress(b"foo")
dctx = zstd.ZstdDecompressor(
dict_data=None,
max_window_size=0,
format=zstd.FORMAT_ZSTD1,
)
self.assertEqual(dctx.decompress(compressed), b"foo")
def test_multiple_frames(self):
cctx = zstd.ZstdCompressor()
foo = cctx.compress(b"foo")
bar = cctx.compress(b"bar")
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(foo + bar), b"foo")
self.assertEqual(
dctx.decompress(foo + bar, allow_extra_data=True), b"foo"
)
with self.assertRaisesRegex(
zstd.ZstdError,
"ZstdDecompressor.read_across_frames=True is not yet implemented",
):
dctx.decompress(foo + bar, read_across_frames=True)
with self.assertRaisesRegex(
zstd.ZstdError,
"%d bytes of unused data, which is disallowed" % len(bar),
):
dctx.decompress(foo + bar, allow_extra_data=False)
def test_junk_after_frame(self):
cctx = zstd.ZstdCompressor()
frame = cctx.compress(b"foo")
dctx = zstd.ZstdDecompressor()
self.assertEqual(dctx.decompress(frame + b"junk"), b"foo")
self.assertEqual(
dctx.decompress(frame + b"junk", allow_extra_data=True), b"foo"
)
with self.assertRaisesRegex(
zstd.ZstdError, "4 bytes of unused data, which is disallowed"
):
dctx.decompress(frame + b"junk", allow_extra_data=False)
|