1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
|
import hashlib
import io
import os
import pathlib
import ppmd
testdata_path = pathlib.Path(os.path.dirname(__file__)).joinpath('data')
data = b'This file is located in a folder.This file is located in the root.'
READ_BLOCKSIZE = 16384
def test_ppmd_encoder():
with io.BytesIO() as dst:
with ppmd.Ppmd7Encoder(dst, 6, 16 << 20) as encoder:
encoder.encode(data)
encoder.flush()
result = dst.getvalue()
assert len(result) == 41
with testdata_path.joinpath('ppmd7.dat').open('rb') as f:
assert result == f.read()
def test_ppmd_encoder2():
with io.BytesIO() as dst:
with ppmd.Ppmd7Encoder(dst, 6, 16 << 20) as encoder:
encoder.encode(data[:33])
encoder.encode(data[33:])
encoder.flush()
result = dst.getvalue()
assert len(result) == 41
with testdata_path.joinpath('ppmd7.dat').open('rb') as f:
assert result == f.read()
def test_ppmd_decoder():
with testdata_path.joinpath('ppmd7.dat').open('rb') as f:
with ppmd.Ppmd7Decoder(f, 6, 16 << 20) as decoder:
result = decoder.decode(33)
result += decoder.decode(33)
assert result == data
def test_ppmd_encode_decode(tmp_path):
length = 0
m = hashlib.sha256()
with testdata_path.joinpath('10000SalesRecords.csv').open('rb') as f:
with tmp_path.joinpath('target.ppmd').open('wb') as target:
with ppmd.Ppmd7Encoder(target, 6, 16 << 20) as enc:
data = f.read(READ_BLOCKSIZE)
while len(data) > 0:
m.update(data)
length += len(data)
enc.encode(data)
data = f.read(READ_BLOCKSIZE)
enc.flush()
shash = m.digest()
m2 = hashlib.sha256()
with tmp_path.joinpath('target.ppmd').open('rb') as target:
with tmp_path.joinpath('target.csv').open('wb') as out:
with ppmd.Ppmd7Decoder(target, 6, 16 << 20) as dec:
remaining = length
while remaining > 0:
max_length = min(remaining, READ_BLOCKSIZE)
res = dec.decode(max_length)
remaining -= len(res)
m2.update(res)
out.write(res)
res = dec.decode(remaining)
remaining -= len(res)
m2.update(res)
out.write(res)
thash = m2.digest()
assert thash == shash
|