File: __init__.py

package info (click to toggle)
python-libarchive-c 5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 908 kB
  • sloc: python: 1,552; makefile: 7
file content (136 lines) | stat: -rw-r--r-- 4,297 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from contextlib import closing, contextmanager
from copy import copy
from os import chdir, getcwd, stat, walk
from os.path import abspath, dirname, join
from stat import S_ISREG
import tarfile
try:
    from stat import filemode
except ImportError:  # Python 2
    filemode = tarfile.filemode

from libarchive import file_reader


data_dir = join(dirname(__file__), 'data')


def check_archive(archive, tree):
    tree2 = copy(tree)
    for e in archive:
        epath = str(e).rstrip('/')
        assert epath in tree2
        estat = tree2.pop(epath)
        assert e.mtime == int(estat['mtime'])
        if not e.isdir:
            size = e.size
            if size is not None:
                assert size == estat['size']
            with open(epath, 'rb') as f:
                for block in e.get_blocks():
                    assert f.read(len(block)) == block
                leftover = f.read()
                assert not leftover

    # Check that there are no missing directories or files
    assert len(tree2) == 0


def get_entries(location):
    """
    Using the archive file at `location`, return an iterable of name->value
    mappings for each libarchive.ArchiveEntry objects essential attributes.
    Paths are base64-encoded because JSON is UTF-8 and cannot handle
    arbitrary binary pathdata.
    """
    with file_reader(location) as arch:
        for entry in arch:
            # libarchive introduces prefixes such as h prefix for
            # hardlinks: tarfile does not, so we ignore the first char
            mode = entry.strmode[1:].decode('ascii')
            yield {
                'path': surrogate_decode(entry.pathname),
                'mtime': entry.mtime,
                'size': entry.size,
                'mode': mode,
                'isreg': entry.isreg,
                'isdir': entry.isdir,
                'islnk': entry.islnk,
                'issym': entry.issym,
                'linkpath': surrogate_decode(entry.linkpath),
                'isblk': entry.isblk,
                'ischr': entry.ischr,
                'isfifo': entry.isfifo,
                'isdev': entry.isdev,
                'uid': entry.uid,
                'gid': entry.gid
            }


def get_tarinfos(location):
    """
    Using the tar archive file at `location`, return an iterable of
    name->value mappings for each tarfile.TarInfo objects essential
    attributes.
    Paths are base64-encoded because JSON is UTF-8 and cannot handle
    arbitrary binary pathdata.
    """
    with closing(tarfile.open(location)) as tar:
        for entry in tar:
            path = surrogate_decode(entry.path or '')
            if entry.isdir() and not path.endswith('/'):
                path += '/'
            # libarchive introduces prefixes such as h prefix for
            # hardlinks: tarfile does not, so we ignore the first char
            mode = filemode(entry.mode)[1:]
            yield {
                'path': path,
                'mtime': entry.mtime,
                'size': entry.size,
                'mode': mode,
                'isreg': entry.isreg(),
                'isdir': entry.isdir(),
                'islnk': entry.islnk(),
                'issym': entry.issym(),
                'linkpath': surrogate_decode(entry.linkpath or None),
                'isblk': entry.isblk(),
                'ischr': entry.ischr(),
                'isfifo': entry.isfifo(),
                'isdev': entry.isdev(),
                'uid': entry.uid,
                'gid': entry.gid
            }


@contextmanager
def in_dir(dirpath):
    prev = abspath(getcwd())
    chdir(dirpath)
    try:
        yield
    finally:
        chdir(prev)


def stat_dict(path):
    keys = set(('uid', 'gid', 'mtime'))
    mode, _, _, _, uid, gid, size, _, mtime, _ = stat(path)
    if S_ISREG(mode):
        keys.add('size')
    return {k: v for k, v in locals().items() if k in keys}


def treestat(d, stat_dict=stat_dict):
    r = {}
    for dirpath, dirnames, filenames in walk(d):
        r[dirpath] = stat_dict(dirpath)
        for fname in filenames:
            fpath = join(dirpath, fname)
            r[fpath] = stat_dict(fpath)
    return r


def surrogate_decode(o):
    if isinstance(o, bytes):
        return o.decode('utf8', errors='surrogateescape')
    return o