1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
|
from contextlib import contextmanager
from ctypes import cast, c_void_p, POINTER, create_string_buffer
from os import fstat, stat
from . import ffi
from .ffi import (
ARCHIVE_EOF, OPEN_CALLBACK, READ_CALLBACK, CLOSE_CALLBACK, SEEK_CALLBACK,
NO_OPEN_CB, NO_CLOSE_CB, page_size,
)
from .entry import ArchiveEntry, PassedArchiveEntry
class ArchiveRead:
def __init__(self, archive_p, header_codec='utf-8'):
self._pointer = archive_p
self.header_codec = header_codec
def __iter__(self):
"""Iterates through an archive's entries.
"""
archive_p = self._pointer
header_codec = self.header_codec
read_next_header2 = ffi.read_next_header2
while 1:
entry = ArchiveEntry(archive_p, header_codec)
r = read_next_header2(archive_p, entry._entry_p)
if r == ARCHIVE_EOF:
return
yield entry
entry.__class__ = PassedArchiveEntry
@property
def bytes_read(self):
return ffi.filter_bytes(self._pointer, -1)
@property
def filter_names(self):
count = ffi.filter_count(self._pointer)
return [ffi.filter_name(self._pointer, i) for i in range(count - 1)]
@property
def format_name(self):
return ffi.format_name(self._pointer)
@contextmanager
def new_archive_read(format_name='all', filter_name='all', passphrase=None):
"""Creates an archive struct suitable for reading from an archive.
Returns a pointer if successful. Raises ArchiveError on error.
"""
archive_p = ffi.read_new()
try:
if passphrase:
if not isinstance(passphrase, bytes):
passphrase = passphrase.encode('utf-8')
try:
ffi.read_add_passphrase(archive_p, passphrase)
except AttributeError:
raise NotImplementedError(
f"the libarchive being used (version {ffi.version_number()}, "
f"path {ffi.libarchive_path}) doesn't support encryption"
)
ffi.get_read_filter_function(filter_name)(archive_p)
ffi.get_read_format_function(format_name)(archive_p)
yield archive_p
finally:
ffi.read_free(archive_p)
@contextmanager
def custom_reader(
read_func, format_name='all', filter_name='all',
open_func=None, seek_func=None, close_func=None,
block_size=page_size, archive_read_class=ArchiveRead, passphrase=None,
header_codec='utf-8',
):
"""Read an archive using a custom function.
"""
open_cb = OPEN_CALLBACK(open_func) if open_func else NO_OPEN_CB
read_cb = READ_CALLBACK(read_func)
close_cb = CLOSE_CALLBACK(close_func) if close_func else NO_CLOSE_CB
seek_cb = SEEK_CALLBACK(seek_func)
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
if seek_func:
ffi.read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield archive_read_class(archive_p, header_codec)
@contextmanager
def fd_reader(
fd, format_name='all', filter_name='all', block_size=4096, passphrase=None,
header_codec='utf-8',
):
"""Read an archive from a file descriptor.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
try:
block_size = fstat(fd).st_blksize
except (OSError, AttributeError): # pragma: no cover
pass
ffi.read_open_fd(archive_p, fd, block_size)
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def file_reader(
path, format_name='all', filter_name='all', block_size=4096, passphrase=None,
header_codec='utf-8',
):
"""Read an archive from a file.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
try:
block_size = stat(path).st_blksize
except (OSError, AttributeError): # pragma: no cover
pass
ffi.read_open_filename_w(archive_p, path, block_size)
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def memory_reader(
buf, format_name='all', filter_name='all', passphrase=None,
header_codec='utf-8',
):
"""Read an archive from memory.
"""
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
ffi.read_open_memory(archive_p, cast(buf, c_void_p), len(buf))
yield ArchiveRead(archive_p, header_codec)
@contextmanager
def stream_reader(
stream, format_name='all', filter_name='all', block_size=page_size,
passphrase=None, header_codec='utf-8',
):
"""Read an archive from a stream.
The `stream` object must support the standard `readinto` method.
If `stream.seekable()` returns `True`, then an appropriate seek callback is
passed to libarchive.
"""
buf = create_string_buffer(block_size)
buf_p = cast(buf, c_void_p)
def read_func(archive_p, context, ptrptr):
# readinto the buffer, returns number of bytes read
length = stream.readinto(buf)
# write the address of the buffer into the pointer
ptrptr = cast(ptrptr, POINTER(c_void_p))
ptrptr[0] = buf_p
# tell libarchive how much data was written into the buffer
return length
def seek_func(archive_p, context, offset, whence):
stream.seek(offset, whence)
# tell libarchive the current position
return stream.tell()
open_cb = NO_OPEN_CB
read_cb = READ_CALLBACK(read_func)
close_cb = NO_CLOSE_CB
seek_cb = SEEK_CALLBACK(seek_func)
with new_archive_read(format_name, filter_name, passphrase) as archive_p:
if stream.seekable():
ffi.read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield ArchiveRead(archive_p, header_codec)
seekable_stream_reader = stream_reader
|