File: test_ppmd8.py

package info (click to toggle)
python-pyppmd 1.2.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,648 kB
  • sloc: ansic: 5,644; python: 1,604; makefile: 15
file content (120 lines) | stat: -rw-r--r-- 4,055 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import hashlib
import os
import pathlib

import pytest

import pyppmd

testdata_path = pathlib.Path(os.path.dirname(__file__)).joinpath("data")
source = b"This file is located in a folder.This file is located in the root.\n"
encoded = (
    b"\x54\x16\x43\x6d\x5c\xd8\xd7\x3a\xb3\x58\x31\xac\x1d\x09\x23\xfd\x11\xd5\x72\x62\x73"
    b"\x13\xb6\xce\xb2\xe7\x6a\xb9\xf6\xe8\x66\xf5\x08\xc3\x0a\x09\x36\x12\xeb\xda\xda\xba"
)

READ_BLOCKSIZE = 16384


def test_ppmd8_encoder1():
    encoder = pyppmd.Ppmd8Encoder(6, 8 << 20, pyppmd.PPMD8_RESTORE_METHOD_RESTART)
    result = encoder.encode(source)
    result += encoder.flush()
    assert result == encoded


def test_ppmd8_encoder2():
    encoder = pyppmd.Ppmd8Encoder(6, 8 << 20, pyppmd.PPMD8_RESTORE_METHOD_RESTART)
    result = encoder.encode(source[:33])
    result += encoder.encode(source[33:])
    result += encoder.flush()
    assert result == encoded


def test_ppmd8_decoder1():
    decoder = pyppmd.Ppmd8Decoder(6, 8 << 20, pyppmd.PPMD8_RESTORE_METHOD_RESTART)
    result = decoder.decode(encoded, -1)
    result += decoder.decode(b"", -1)
    assert result == source
    # assert decoder.eof and not decoder.needs_input


def test_ppmd8_decoder2():
    decoder = pyppmd.Ppmd8Decoder(6, 8 << 20, pyppmd.PPMD8_RESTORE_METHOD_RESTART)
    result = decoder.decode(encoded[:20])
    result += decoder.decode(encoded[20:])
    result += decoder.decode(b"", -1)
    assert result == source
    # assert decoder.eof and not decoder.needs_input


# test mem_size less than original file size as well
@pytest.mark.parametrize(
    "mem_size, restore_method",
    [
        (8 << 20, pyppmd.PPMD8_RESTORE_METHOD_RESTART),
        (8 << 20, pyppmd.PPMD8_RESTORE_METHOD_CUT_OFF),
        (1 << 20, pyppmd.PPMD8_RESTORE_METHOD_RESTART),
        (1 << 20, pyppmd.PPMD8_RESTORE_METHOD_CUT_OFF),
    ],
)
@pytest.mark.timeout(20)
def test_ppmd8_encode_decode(tmp_path, mem_size, restore_method):
    length = 0
    m = hashlib.sha256()
    with testdata_path.joinpath("10000SalesRecords.csv").open("rb") as f:
        with tmp_path.joinpath("target.ppmd").open("wb") as target:
            enc = pyppmd.Ppmd8Encoder(6, mem_size, restore_method=restore_method)
            data = f.read(READ_BLOCKSIZE)
            while len(data) > 0:
                m.update(data)
                length += len(data)
                target.write(enc.encode(data))
                data = f.read(READ_BLOCKSIZE)
            target.write(enc.flush(endmark=True))
    shash = m.digest()
    m2 = hashlib.sha256()
    assert length == 1237262
    length = 0
    with tmp_path.joinpath("target.ppmd").open("rb") as target:
        with tmp_path.joinpath("target.csv").open("wb") as out:
            dec = pyppmd.Ppmd8Decoder(6, mem_size, restore_method=restore_method)
            data = target.read(READ_BLOCKSIZE)
            while not dec.eof:
                res = dec.decode(data)
                m2.update(res)
                out.write(res)
                length += len(res)
                if len(data) == 0:
                    break
                data = target.read(READ_BLOCKSIZE)
    assert length == 1237262
    thash = m2.digest()
    assert thash == shash


def test_ppmd8_encode_decode_shortage():
    txt = "\U0001127f\U00069f6a\U00069f6a"
    obj = txt.encode("UTF-8")
    enc = pyppmd.Ppmd8Encoder(3, 2048)
    data = enc.encode(obj)
    data += enc.flush()
    length = len(obj)
    dec = pyppmd.Ppmd8Decoder(3, 2048)
    res = dec.decode(data, length)
    if len(res) < length:
        res += dec.decode(b"\0", length - len(res))
    assert obj == res


def test_ppmdcompress():
    compressor = pyppmd.PpmdCompressor(6, 8 << 20, restore_method=pyppmd.PPMD8_RESTORE_METHOD_RESTART, variant="I")
    result = compressor.compress(source)
    result += compressor.flush()
    assert result == encoded


def test_ppmddecompress():
    decomp = pyppmd.PpmdDecompressor(6, 8 << 20, restore_method=pyppmd.PPMD8_RESTORE_METHOD_RESTART, variant="I")
    result = decomp.decompress(encoded)
    assert result == source