1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
import lz4.frame as lz4frame
import pytest
import os
import sys
from . helpers import (
get_chunked,
get_frame_info_check,
)
test_data=[
(b'', 1, 1),
(os.urandom(8 * 1024), 8, 1),
(os.urandom(8 * 1024), 1, 8),
(b'0' * 8 * 1024, 8, 1),
(b'0' * 8 * 1024, 8, 1),
(bytearray(b''), 1, 1),
(bytearray(os.urandom(8 * 1024)), 8, 1),
]
if sys.version_info > (2, 7):
test_data += [
(memoryview(b''), 1, 1),
(memoryview(os.urandom(8 * 1024)), 8, 1)
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
def test_roundtrip_chunked(data, block_size, block_linked,
content_checksum, block_checksum,
compression_level,
auto_flush, store_size):
data, c_chunks, d_chunks = data
c_context = lz4frame.create_compression_context()
kwargs = {}
kwargs['compression_level'] = compression_level
kwargs['block_size'] = block_size
kwargs['block_linked'] = block_linked
kwargs['content_checksum'] = content_checksum
kwargs['block_checksum'] = block_checksum
kwargs['auto_flush'] = auto_flush
if store_size is True:
kwargs['source_size'] = len(data)
compressed = lz4frame.compress_begin(
c_context,
**kwargs
)
data_in = get_chunked(data, c_chunks)
try:
while True:
compressed += lz4frame.compress_chunk(
c_context,
next(data_in)
)
except StopIteration:
pass
finally:
del data_in
compressed += lz4frame.compress_flush(c_context)
get_frame_info_check(
compressed,
len(data),
store_size,
block_size,
block_linked,
content_checksum,
block_checksum,
)
d_context = lz4frame.create_decompression_context()
compressed_in = get_chunked(compressed, d_chunks)
decompressed = b''
bytes_read = 0
eofs = []
try:
while True:
d, b, e = lz4frame.decompress_chunk(
d_context,
next(compressed_in),
)
decompressed += d
bytes_read += b
eofs.append(e)
except StopIteration:
pass
finally:
del compressed_in
assert bytes_read == len(compressed)
assert decompressed == data
assert eofs[-1] == True
assert (True in eofs[:-2]) == False
|