File: __init__.py

package info (click to toggle)
py7zr 1.0.0%2Bdfsg1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 6,352 kB
  • sloc: python: 9,009; sh: 360; makefile: 197; ansic: 35
file content (94 lines) | stat: -rw-r--r-- 2,987 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
import binascii
import functools
import hashlib
import os
import pathlib
import shutil
import subprocess
from datetime import datetime, timezone

import pytest

import py7zr

try:
    import libarchive.public as LAPublic
except ImportError:
    LAPublic = None

testdata_path = os.path.join(os.path.dirname(__file__), "data")
os.umask(0o022)


def check_output(expected, tmpdir):
    for exp in expected:
        if isinstance(tmpdir, str):
            target = pathlib.Path(tmpdir).joinpath(exp["filename"])
        else:
            target = tmpdir.joinpath(exp["filename"])
        if os.name == "posix":
            if exp.get("mode", None):
                assert target.stat().st_mode == exp["mode"], "%s, actual: %d, expected: %d" % (
                    exp["filename"],
                    target.stat().st_mode,
                    exp["mode"],
                )
        if exp.get("mtime", None):
            assert target.stat().st_mtime == exp["mtime"], "%s, actual: %d, expected: %d" % (
                exp["filename"],
                target.stat().st_mtime,
                exp["mtime"],
            )
        m = hashlib.sha256()
        m.update(target.open("rb").read())
        assert m.digest() == binascii.unhexlify(exp["digest"]), "Fails digest for %s" % exp["filename"]


def decode_all(archive, expected, tmpdir):
    for i, file_info in enumerate(archive.files):
        assert file_info.lastwritetime is not None
        assert file_info.filename is not None
    archive.extractall(path=tmpdir)
    archive.close()
    check_output(expected, tmpdir)


async def aio7zr(archive, path):
    loop = asyncio.get_event_loop()
    sevenzip = py7zr.SevenZipFile(archive)
    partial_py7zr = functools.partial(sevenzip.extractall, path=path)
    loop.run_in_executor(None, partial_py7zr)
    loop.run_in_executor(None, sevenzip.close)


def ltime(dt_utc):
    return dt_utc.replace(tzinfo=timezone.utc).astimezone(tz=None).strftime("%Y-%m-%d %H:%M:%S")


def ltime2(y, m, d, h, min, s):
    return ltime(datetime(y, m, d, h, min, s))


def p7zip_test(archive):
    if shutil.which("7z"):
        result = subprocess.run(["7z", "t", archive.as_posix()], stdout=subprocess.PIPE)
        if result.returncode != 0:
            print(result.stdout)
            pytest.fail("7z command report error")


def libarchive_extract(archive, tmpdir):
    if LAPublic:
        if not tmpdir.exists():
            tmpdir.mkdir(parents=True)
        with LAPublic.file_reader(str(archive)) as e:
            for entry in e:
                if entry.filetype.IFDIR:
                    tmpdir.joinpath(entry.pathname).mkdir(parents=True)
                elif entry.filetype.IFLNK:
                    tmpdir.joinpath(entry.pathname).link_to(entry.symlink_targetpath)
                else:
                    with tmpdir.joinpath(entry.pathname).open(mode="wb") as f:
                        for block in entry.get_blocks():
                            f.write(block)