1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
import lz4.stream
import pytest
import sys
if sys.version_info < (3, ):
from struct import pack, unpack
def _get_format(length, byteorder, signed):
_order = {'l': '<', 'b': '>'}
_fmt = {1: 'b', 2: 'h', 4: 'i', 8: 'q'}
_sign = {True: lambda x: x.lower(), False: lambda x: x.upper()}
return _sign[signed](_order[byteorder[0].lower()] + _fmt[length])
def int_to_bytes(value, length=4, byteorder='little', signed=False):
return bytearray(pack(_get_format(length, byteorder, signed), value))
def int_from_bytes(bytes, byteorder='little', signed=False):
return unpack(_get_format(len(bytes), byteorder, signed), bytes)[0]
else:
def int_to_bytes(value, length=4, byteorder='little', signed=False):
return value.to_bytes(length, byteorder, signed=signed)
def int_from_bytes(bytes, byteorder='little', signed=False):
return int.from_bytes(bytes, byteorder, signed=signed)
# Out-of-band block size record tests
def test_round_trip():
data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 4}
oob_kwargs = {}
oob_kwargs.update(kwargs)
oob_kwargs['store_comp_size'] = 0
ib_cstream = bytearray()
oob_cstream = bytearray()
oob_sizes = []
with lz4.stream.LZ4StreamCompressor(**kwargs) as ib_proc, \
lz4.stream.LZ4StreamCompressor(**oob_kwargs) as oob_proc:
for start in range(0, len(data), kwargs['buffer_size']):
chunk = data[start:start + kwargs['buffer_size']]
ib_block = ib_proc.compress(chunk)
oob_block = oob_proc.compress(chunk)
assert (len(ib_block) == (len(oob_block) + kwargs['store_comp_size'])), \
"Blocks size mismatch: " \
"{}/{}".format(len(ib_block), len(oob_block) + kwargs['store_comp_size'])
assert (int_from_bytes(ib_block[:kwargs['store_comp_size']]) == len(oob_block)), \
"Blocks size record mismatch: got {}, expected {}".format(
int_from_bytes(ib_block[:kwargs['store_comp_size']]),
len(oob_block))
assert (ib_block[kwargs['store_comp_size']:] == oob_block), "Blocks data mismatch"
ib_cstream += ib_block
oob_cstream += oob_block
oob_sizes.append(len(oob_block))
ib_dstream = bytearray()
oob_dstream = bytearray()
with lz4.stream.LZ4StreamDecompressor(**kwargs) as ib_proc, \
lz4.stream.LZ4StreamDecompressor(**oob_kwargs) as oob_proc:
ib_offset = 0
oob_index = 0
oob_offset = 0
while ib_offset < len(ib_cstream) and oob_index < len(oob_sizes):
ib_block = ib_proc.get_block(ib_cstream[ib_offset:])
oob_block = oob_cstream[oob_offset:oob_offset + oob_sizes[oob_index]]
assert (len(ib_block) == len(oob_block)), \
"Blocks size mismatch: {}/{}".format(len(ib_block), len(oob_block))
assert (ib_block == oob_block), "Blocks data mismatch"
ib_chunk = ib_proc.decompress(ib_block)
oob_chunk = oob_proc.decompress(oob_block)
assert (len(ib_chunk) == len(oob_chunk)), \
"Chunks size mismatch: {}/{}".format(len(ib_chunk), len(oob_chunk))
assert (ib_chunk == oob_chunk), "Chunks data mismatch"
ib_dstream += ib_chunk
oob_dstream += oob_chunk
ib_offset += kwargs['store_comp_size'] + len(ib_block)
oob_offset += oob_sizes[oob_index]
oob_index += 1
assert (len(ib_dstream) == len(oob_dstream)), "Decompressed streams length mismatch"
assert (len(data) == len(ib_dstream)), "Decompressed streams length mismatch"
assert (len(data) == len(oob_dstream)), "Decompressed streams length mismatch"
assert (ib_dstream == oob_dstream), "Decompressed streams mismatch"
assert (data == ib_dstream), "Decompressed streams mismatch"
assert (data == oob_dstream), "Decompressed streams mismatch"
def test_invalid_usage():
data = b"2099023098234882923049823094823094898239230982349081231290381209380981203981209381238901283098908123109238098123" * 24
kwargs = {'strategy': "double_buffer", 'buffer_size': 256, 'store_comp_size': 0}
cstream = bytearray()
oob_sizes = []
with lz4.stream.LZ4StreamCompressor(**kwargs) as proc:
for start in range(0, len(data), kwargs['buffer_size']):
chunk = data[start:start + kwargs['buffer_size']]
block = proc.compress(chunk)
cstream += block
oob_sizes.append(len(block))
message = r"^LZ4 context is configured for storing block size out-of-band$"
with pytest.raises(lz4.stream.LZ4StreamError, match=message):
dstream = bytearray()
with lz4.stream.LZ4StreamDecompressor(**kwargs) as proc:
offset = 0
index = 0
while offset < len(cstream):
block = proc.get_block(cstream[offset:])
chunk = proc.decompress(block)
dstream += chunk
offset += kwargs['store_comp_size'] + len(block)
index += 1
|