1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
import lz4.frame as lz4frame
import os
import pytest
from . helpers import (
get_frame_info_check,
get_chunked,
)
test_data=[
b'',
(128 * (32 * os.urandom(32))),
(256 * (32 * os.urandom(32))),
(512 * (32 * os.urandom(32))),
(1024 * (32 * os.urandom(32))),
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
@pytest.fixture(
params=[
(True),
(False)
]
)
def reset(request):
return request.param
@pytest.fixture(
params=[
(1),
(8)
]
)
def chunks(request):
return request.param
def test_roundtrip_LZ4FrameCompressor(
data,
chunks,
block_size,
block_linked,
reset,
store_size,
block_checksum,
content_checksum):
with lz4frame.LZ4FrameCompressor(
block_size=block_size,
block_linked=block_linked,
content_checksum=content_checksum,
block_checksum=block_checksum,
) as compressor:
def do_compress():
if store_size is True:
compressed = compressor.begin(source_size=len(data))
else:
compressed = compressor.begin()
for chunk in get_chunked(data, chunks):
compressed += compressor.compress(chunk)
compressed += compressor.flush()
return compressed
compressed = do_compress()
if reset is True:
compressor.reset()
compressed = do_compress()
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
decompressed, bytes_read = lz4frame.decompress(compressed, return_bytes_read=True)
assert data == decompressed
assert bytes_read == len(compressed)
def test_roundtrip_LZ4FrameCompressor_LZ4FrameDecompressor(
data,
chunks,
block_size,
block_linked,
reset,
store_size,
block_checksum,
content_checksum):
with lz4frame.LZ4FrameCompressor(
block_size=block_size,
block_linked=block_linked,
content_checksum=content_checksum,
block_checksum=block_checksum,
) as compressor:
def do_compress():
if store_size is True:
compressed = compressor.begin(source_size=len(data))
else:
compressed = compressor.begin()
for chunk in get_chunked(data, chunks):
compressed += compressor.compress(chunk)
compressed += compressor.flush()
return compressed
compressed = do_compress()
if reset is True:
compressor.reset()
compressed = do_compress()
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
with lz4frame.LZ4FrameDecompressor() as decompressor:
decompressed = b''
for chunk in get_chunked(compressed, chunks):
b = decompressor.decompress(chunk)
decompressed += b
assert data == decompressed
|