1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
import lz4.frame as lz4frame
import pytest
import os
test_data=[
(os.urandom(32) * 256),
]
@pytest.fixture(
params=test_data,
ids=[
'data' + str(i) for i in range(len(test_data))
]
)
def data(request):
return request.param
def test_roundtrip_multiframe_1(data):
nframes = 4
compressed = b''
for _ in range(nframes):
compressed += lz4frame.compress(data)
decompressed = b''
for _ in range(nframes):
decompressed += lz4frame.decompress(compressed)
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def test_roundtrip_multiframe_2(data):
nframes = 4
compressed = b''
ctx = lz4frame.create_compression_context()
for _ in range(nframes):
compressed += lz4frame.compress_begin(ctx)
compressed += lz4frame.compress_chunk(ctx, data)
compressed += lz4frame.compress_flush(ctx)
decompressed = b''
for _ in range(nframes):
decompressed += lz4frame.decompress(compressed)
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def test_roundtrip_multiframe_3(data):
nframes = 4
compressed = b''
ctx = lz4frame.create_compression_context()
for _ in range(nframes):
compressed += lz4frame.compress_begin(ctx)
compressed += lz4frame.compress_chunk(ctx, data)
compressed += lz4frame.compress_flush(ctx)
decompressed = b''
ctx = lz4frame.create_decompression_context()
for _ in range(nframes):
d, bytes_read, eof = lz4frame.decompress_chunk(ctx, compressed)
decompressed += d
assert eof == True
assert bytes_read == len(compressed) // nframes
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
def test_roundtrip_multiframe_4(data):
nframes = 4
compressed = b''
with lz4frame.LZ4FrameCompressor() as compressor:
for _ in range(nframes):
compressed += compressor.begin()
compressed += compressor.compress(data)
compressed += compressor.flush()
decompressed = b''
with lz4frame.LZ4FrameDecompressor() as decompressor:
for i in range(nframes):
if i == 0:
d = compressed
else:
d = decompressor.unused_data
decompressed += decompressor.decompress(d)
assert decompressor.eof == True
assert decompressor.needs_input == True
if i == nframes - 1:
assert decompressor.unused_data == None
else:
assert len(decompressor.unused_data) == len(compressed) * (nframes - i - 1) / nframes
assert len(decompressed) == nframes * len(data)
assert data * nframes == decompressed
|