File: test_zipfile.py

package info (click to toggle)
python-zipfile-zstd 0.0.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,300 kB
  • sloc: python: 187; makefile: 2
file content (105 lines) | stat: -rw-r--r-- 3,749 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# ported from https://github.com/cielavenir/zipfile39/blob/master/test/test_zipfile.py #

import os
import sys
import hashlib
import subprocess
import itertools
import pytest
from tempfile import TemporaryDirectory
from inspect import signature

mydir = os.path.abspath(os.path.dirname(__file__))
sys.path.append(os.path.join(mydir,'..'))
os.chdir(mydir)

import zipfile
import zipfile_zstd

info7z  = subprocess.check_output(['7z', 'i'])
avail7z = {
    zipfile.ZIP_STORED:    True,
    zipfile.ZIP_DEFLATED:  b'    40108 Deflate' in info7z,
    zipfile.ZIP_BZIP2:     b'    40202 BZip2'   in info7z,
    zipfile.ZIP_LZMA:      b'    30101 LZMA'    in info7z,
    zipfile.ZIP_ZSTANDARD: b'  4F71101 ZSTD'    in info7z,
    # zipfile.ZIP_XZ:        b'       21 LZMA2'   in info7z,
    # zipfile.ZIP_PPMD:      b'    30401 PPMD'    in info7z,
}

fnames = [
    'data/10000SalesRecords.csv',
    # 'data/7zz',
]

methods = [
    (zipfile.ZIP_STORED, 0),
    (zipfile.ZIP_DEFLATED, 6),
    (zipfile.ZIP_BZIP2, 9),
    (zipfile.ZIP_LZMA, 6),
    (zipfile.ZIP_ZSTANDARD, 3),
    # (zipfile.ZIP_XZ, 6),
    # (zipfile.ZIP_PPMD, 5),
]

@pytest.mark.parametrize('fname,method,level',[
    tuple([fname]+list(method)) for fname, method in itertools.product(fnames, methods)
])
def test_zipfile_writeread(fname,method,level):
    st = os.stat(fname)
    with open(fname, 'rb') as f:
        body = f.read()
        sha256 = hashlib.sha256(body).hexdigest()
    
    with TemporaryDirectory() as tmpdir:
        kwargs = {'compression': method}
        if 'compresslevel' in signature(zipfile._get_compressor).parameters:
            kwargs['compresslevel'] = level
        with zipfile.ZipFile(os.path.join(tmpdir, 'test.zip'), 'w', **kwargs) as zip:
            zip.write(fname)
        if avail7z[method]:
            subprocess.check_call(['7z', 't', os.path.join(tmpdir, 'test.zip')], shell=False)
        with zipfile.ZipFile(os.path.join(tmpdir, 'test.zip'), 'r') as zip:
            info = zip.getinfo(fname)
            assert info.compress_type == method
            dec = zip.read(info)
            len(dec) == st.st_size
            hashlib.sha256(dec).hexdigest() == sha256

@pytest.mark.parametrize('fname,method,level',[
    tuple([fname]+list(method)) for fname, method in itertools.product(fnames, methods)
])
def test_zipfile_open(fname,method,level):
    chunksiz = 512
    st = os.stat(fname)
    cnt = (st.st_size+chunksiz-1)//chunksiz

    with open(fname, 'rb') as f:
        body = f.read()
        sha256 = hashlib.sha256(body).hexdigest()
    
    with TemporaryDirectory() as tmpdir:
        kwargs = {'compression': method}
        if 'compresslevel' in signature(zipfile._get_compressor).parameters:
            kwargs['compresslevel'] = level
        with zipfile.ZipFile(os.path.join(tmpdir, 'test.zip'), 'w', **kwargs) as zip:
            with zip.open(fname, 'w') as zf:
                for i in range(cnt):
                    zf.write(body[chunksiz*i:chunksiz*(i+1)])
        if avail7z[method]:
            subprocess.check_call(['7z', 't', os.path.join(tmpdir, 'test.zip')], shell=False)
        with zipfile.ZipFile(os.path.join(tmpdir, 'test.zip'), 'r') as zip:
            info = zip.getinfo(fname)
            assert info.compress_type == method
            decsiz = 0
            hashobj = hashlib.sha256()
            with zip.open(info, 'r') as zf:
                while True:
                    dec0 = zf.read(chunksiz)
                    decsiz += len(dec0)
                    hashobj.update(dec0)
                    if len(dec0) < chunksiz:
                        break
                    assert len(dec0) == chunksiz
            assert decsiz == st.st_size
            assert hashobj.hexdigest() == sha256