File: test_pyzstd.py

package info (click to toggle)
numcodecs 0.16.5%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 968 kB
  • sloc: python: 3,938; makefile: 42
file content (82 lines) | stat: -rw-r--r-- 2,734 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# Check Zstd against pyzstd package

import numpy as np
import pytest
try:
    from compression import zstd as pyzstd
except ImportError:
    try:
       from backports import zstd as pyzstd
    except ImportError:
        import pyzstd

from numcodecs.zstd import Zstd

test_data = [
    b"Hello World!",
    np.arange(113).tobytes(),
    np.arange(10, 15).tobytes(),
    np.random.randint(3, 50, size=(53,), dtype=np.uint16).tobytes(),
]


@pytest.mark.parametrize("input", test_data)
def test_pyzstd_simple(input):
    """
    Test if Zstd.[decode, encode] can perform the inverse operation to
    pyzstd.[compress, decompress] in the simple case.
    """
    z = Zstd()
    assert z.decode(pyzstd.compress(input)) == input
    assert pyzstd.decompress(z.encode(input)) == input


@pytest.mark.parametrize("input", test_data)
def test_pyzstd_simple_multiple_frames_decode(input):
    """
    Test decompression of two concatenated frames of known sizes

    numcodecs.zstd.Zstd currently fails because it only assesses the size of the
    first frame. Rather, it should keep iterating through all the frames until
    the end of the input buffer.
    """
    z = Zstd()
    assert pyzstd.decompress(pyzstd.compress(input) * 2) == input * 2
    assert z.decode(pyzstd.compress(input) * 2) == input * 2


@pytest.mark.parametrize("input", test_data)
def test_pyzstd_simple_multiple_frames_encode(input):
    """
    Test if pyzstd can decompress two concatenated frames from Zstd.encode
    """
    z = Zstd()
    assert pyzstd.decompress(z.encode(input) * 2) == input * 2


@pytest.mark.parametrize("input", test_data)
def test_pyzstd_streaming(input):
    """
    Test if Zstd can decode a single frame and concatenated frames in streaming
    mode where the decompressed size is not recorded in the frame header.
    """
    pyzstd_c = pyzstd.ZstdCompressor()
    pyzstd_d = pyzstd.ZstdDecompressor()
    # pyzstd_e = pyzstd.EndlessZstdDecompressor()
    z = Zstd()

    d_bytes = input
    pyzstd_c.compress(d_bytes)
    c_bytes = pyzstd_c.flush()
    assert z.decode(c_bytes) == d_bytes
    assert pyzstd_d.decompress(z.encode(d_bytes)) == d_bytes

    # Test multiple streaming frames
    assert z.decode(c_bytes * 2) == pyzstd.decompress(c_bytes * 2)
    assert z.decode(c_bytes * 3) == pyzstd.decompress(c_bytes * 3)
    assert z.decode(c_bytes * 4) == pyzstd.decompress(c_bytes * 4)
    assert z.decode(c_bytes * 5) == pyzstd.decompress(c_bytes * 5)
    assert z.decode(c_bytes * 7) == pyzstd.decompress(c_bytes * 7)
    assert z.decode(c_bytes * 11) == pyzstd.decompress(c_bytes * 11)
    assert z.decode(c_bytes * 13) == pyzstd.decompress(c_bytes * 13)
    assert z.decode(c_bytes * 99) == pyzstd.decompress(c_bytes * 99)