File: test_benchmark.py

package info (click to toggle)
python-pyppmd 1.1.1%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 2,632 kB
  • sloc: ansic: 5,638; python: 1,604; makefile: 15
file content (76 lines) | stat: -rw-r--r-- 2,888 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import io
import os
import pathlib

import pytest

import pyppmd

testdata_path = pathlib.Path(os.path.join(os.path.dirname(__file__), "data"))
testdata = testdata_path.joinpath("10000SalesRecords.csv")
src_size = testdata.stat().st_size
READ_BLOCKSIZE = 1048576

targets = [("PPMd H", 7, 6, 16 << 20), ("PPMd I", 8, 8, 8 << 20)]


@pytest.mark.benchmark(group="compress")
@pytest.mark.parametrize("name, var, max_order, mem_size", targets)
def test_benchmark_text_compress(tmp_path, benchmark, name, var, max_order, mem_size):
    cpuinfo = pytest.importorskip("cpuinfo")

    def encode(var, max_order, mem_size):
        if var == 7:
            encoder = pyppmd.Ppmd7Encoder(max_order=max_order, mem_size=mem_size)
        else:
            encoder = pyppmd.Ppmd8Encoder(max_order=max_order, mem_size=mem_size)
        with io.BytesIO() as target:
            with testdata.open("rb") as src:
                data = src.read(READ_BLOCKSIZE)
                while len(data) > 0:
                    target.write(encoder.encode(data))
                    data = src.read(READ_BLOCKSIZE)
                target.write(encoder.flush())

    benchmark.extra_info["data_size"] = src_size
    benchmark(encode, var, max_order, mem_size)


@pytest.mark.benchmark(group="decompress")
@pytest.mark.parametrize("name, var, max_order, mem_size", targets)
def test_benchmark_text_decompress(tmp_path, benchmark, name, var, max_order, mem_size):
    cpuinfo = pytest.importorskip("cpuinfo")

    def decode(var, max_order, mem_size):
        if var == 7:
            decoder = pyppmd.Ppmd7Decoder(max_order=max_order, mem_size=mem_size)
        else:
            decoder = pyppmd.Ppmd8Decoder(max_order=max_order, mem_size=mem_size)
        with tmp_path.joinpath("target.ppmd").open("rb") as src:
            with io.BytesIO() as target:
                remaining = src_size
                data = src.read(READ_BLOCKSIZE)
                while remaining > 0:
                    out = decoder.decode(data, remaining)
                    if len(out) == 0:
                        break
                    target.write(out)
                    remaining = remaining - len(out)
                    data = src.read(READ_BLOCKSIZE)
            assert remaining == 0

    # prepare compressed data
    if var == 7:
        encoder = pyppmd.Ppmd7Encoder(max_order=max_order, mem_size=mem_size)
    else:
        encoder = pyppmd.Ppmd8Encoder(max_order=max_order, mem_size=mem_size)
    with tmp_path.joinpath("target.ppmd").open("wb") as target:
        with testdata.open("rb") as src:
            data = src.read(READ_BLOCKSIZE)
            while len(data) > 0:
                target.write(encoder.encode(data))
                data = src.read(READ_BLOCKSIZE)
            target.write(encoder.flush())

    benchmark.extra_info["data_size"] = src_size
    benchmark(decode, var, max_order, mem_size)