1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
import unittest
import zstandard as zstd
class TestDecompressor_content_dict_chain(unittest.TestCase):
def test_bad_inputs_simple(self):
dctx = zstd.ZstdDecompressor()
with self.assertRaises(TypeError):
dctx.decompress_content_dict_chain(b"foo")
with self.assertRaises(TypeError):
dctx.decompress_content_dict_chain((b"foo", b"bar"))
with self.assertRaisesRegex(ValueError, "empty input chain"):
dctx.decompress_content_dict_chain([])
with self.assertRaisesRegex(ValueError, "chunk 0 must be bytes"):
dctx.decompress_content_dict_chain(["foo"])
with self.assertRaisesRegex(ValueError, "chunk 0 must be bytes"):
dctx.decompress_content_dict_chain([True])
with self.assertRaisesRegex(
ValueError, "chunk 0 is too small to contain a zstd frame"
):
dctx.decompress_content_dict_chain([zstd.FRAME_HEADER])
with self.assertRaisesRegex(
ValueError, "chunk 0 is not a valid zstd frame"
):
dctx.decompress_content_dict_chain([b"foo" * 8])
no_size = zstd.ZstdCompressor(write_content_size=False).compress(
b"foo" * 64
)
with self.assertRaisesRegex(
ValueError, "chunk 0 missing content size in frame"
):
dctx.decompress_content_dict_chain([no_size])
# Corrupt first frame.
frame = zstd.ZstdCompressor().compress(b"foo" * 64)
frame = frame[0:12] + frame[15:]
with self.assertRaisesRegex(
zstd.ZstdError, "chunk 0 did not decompress full frame"
):
dctx.decompress_content_dict_chain([frame])
def test_bad_subsequent_input(self):
initial = zstd.ZstdCompressor().compress(b"foo" * 64)
dctx = zstd.ZstdDecompressor()
with self.assertRaisesRegex(ValueError, "chunk 1 must be bytes"):
dctx.decompress_content_dict_chain([initial, "foo"])
with self.assertRaisesRegex(ValueError, "chunk 1 must be bytes"):
dctx.decompress_content_dict_chain([initial, None])
with self.assertRaisesRegex(
ValueError, "chunk 1 is too small to contain a zstd frame"
):
dctx.decompress_content_dict_chain([initial, zstd.FRAME_HEADER])
with self.assertRaisesRegex(
ValueError, "chunk 1 is not a valid zstd frame"
):
dctx.decompress_content_dict_chain([initial, b"foo" * 8])
no_size = zstd.ZstdCompressor(write_content_size=False).compress(
b"foo" * 64
)
with self.assertRaisesRegex(
ValueError, "chunk 1 missing content size in frame"
):
dctx.decompress_content_dict_chain([initial, no_size])
# Corrupt second frame.
cctx = zstd.ZstdCompressor(
dict_data=zstd.ZstdCompressionDict(b"foo" * 64)
)
frame = cctx.compress(b"bar" * 64)
frame = frame[0:12] + frame[15:]
with self.assertRaisesRegex(
zstd.ZstdError, "chunk 1 did not decompress full frame"
):
dctx.decompress_content_dict_chain([initial, frame])
def test_simple(self):
original = [
b"foo" * 64,
b"foobar" * 64,
b"baz" * 64,
b"foobaz" * 64,
b"foobarbaz" * 64,
]
chunks = []
chunks.append(zstd.ZstdCompressor().compress(original[0]))
for i, chunk in enumerate(original[1:]):
d = zstd.ZstdCompressionDict(original[i])
cctx = zstd.ZstdCompressor(dict_data=d)
chunks.append(cctx.compress(chunk))
for i in range(1, len(original)):
chain = chunks[0:i]
expected = original[i - 1]
dctx = zstd.ZstdDecompressor()
decompressed = dctx.decompress_content_dict_chain(chain)
self.assertEqual(decompressed, expected)
|