1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
"""Test reading, writing and extracting archives."""
import io
import json
import libarchive
from libarchive.entry import format_time
from libarchive.extract import EXTRACT_OWNER, EXTRACT_PERM, EXTRACT_TIME
from libarchive.write import memory_writer
from unittest.mock import patch
import pytest
from . import check_archive, in_dir, treestat
def test_buffers(tmpdir):
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
buf = bytes(bytearray(1000000))
with libarchive.memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with libarchive.memory_reader(buf) as archive:
check_archive(archive, tree)
assert archive.format_name == b'GNU tar format'
assert archive.filter_names == [b'xz']
# Extract the archive in tmpdir and check that the data is intact
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_memory(buf, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_fd(tmpdir):
archive_file = open(tmpdir.strpath+'/test.tar.bz2', 'w+b')
fd = archive_file.fileno()
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
with libarchive.fd_writer(fd, 'gnutar', 'bzip2') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
archive_file.seek(0)
with libarchive.fd_reader(fd) as archive:
check_archive(archive, tree)
assert archive.format_name == b'GNU tar format'
assert archive.filter_names == [b'bzip2']
# Extract the archive in tmpdir and check that the data is intact
archive_file.seek(0)
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_fd(fd, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_files(tmpdir):
archive_path = tmpdir.strpath+'/test.tar.gz'
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
with libarchive.file_writer(archive_path, 'ustar', 'gzip') as archive:
archive.add_files('libarchive/')
# Read the archive and check that the data is correct
with libarchive.file_reader(archive_path) as archive:
check_archive(archive, tree)
assert archive.format_name == b'POSIX ustar format'
assert archive.filter_names == [b'gzip']
# Extract the archive in tmpdir and check that the data is intact
with in_dir(tmpdir.strpath):
flags = EXTRACT_OWNER | EXTRACT_PERM | EXTRACT_TIME
libarchive.extract_file(archive_path, flags)
tree2 = treestat('libarchive')
assert tree2 == tree
def test_custom_writer_and_stream_reader():
# Collect information on what should be in the archive
tree = treestat('libarchive')
# Create an archive of our libarchive/ directory
stream = io.BytesIO()
with libarchive.custom_writer(stream.write, 'zip') as archive:
archive.add_files('libarchive/')
stream.seek(0)
# Read the archive and check that the data is correct
with libarchive.stream_reader(stream, 'zip') as archive:
check_archive(archive, tree)
assert archive.format_name == b'ZIP 2.0 (deflation)'
assert archive.filter_names == []
@patch('libarchive.ffi.write_fail')
def test_write_fail(write_fail_mock):
buf = bytes(bytearray(1000000))
try:
with memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
raise TypeError
except TypeError:
pass
assert write_fail_mock.called
@patch('libarchive.ffi.write_fail')
def test_write_not_fail(write_fail_mock):
buf = bytes(bytearray(1000000))
with memory_writer(buf, 'gnutar', 'xz') as archive:
archive.add_files('libarchive/')
assert not write_fail_mock.called
def test_adding_nonexistent_file_to_archive():
stream = io.BytesIO()
with libarchive.custom_writer(stream.write, 'zip') as archive:
with pytest.raises(libarchive.ArchiveError):
archive.add_files('nonexistent')
archive.add_files('libarchive/')
@pytest.mark.parametrize(
'archfmt,data_bytes',
[('zip', b'content'),
('gnutar', b''),
('pax', json.dumps({'a': 1, 'b': 2, 'c': 3}).encode()),
('7zip', b'lorem\0ipsum')])
def test_adding_entry_from_memory(archfmt, data_bytes):
entry_path = 'testfile.data'
entry_data = data_bytes
entry_size = len(data_bytes)
blocks = []
archfmt = 'zip'
has_birthtime = archfmt != 'zip'
atime = (1482144741, 495628118)
mtime = (1482155417, 659017086)
ctime = (1482145211, 536858081)
btime = (1482144740, 495628118) if has_birthtime else None
def write_callback(data):
blocks.append(data[:])
return len(data)
with libarchive.custom_writer(write_callback, archfmt) as archive:
archive.add_file_from_memory(
entry_path, entry_size, entry_data,
atime=atime, mtime=mtime, ctime=ctime, birthtime=btime,
uid=1000, gid=1000,
)
buf = b''.join(blocks)
with libarchive.memory_reader(buf) as memory_archive:
for archive_entry in memory_archive:
expected = entry_data
actual = b''.join(archive_entry.get_blocks())
assert expected == actual
assert archive_entry.path == entry_path
assert archive_entry.atime in (atime[0], format_time(*atime))
assert archive_entry.mtime in (mtime[0], format_time(*mtime))
assert archive_entry.ctime in (ctime[0], format_time(*ctime))
if has_birthtime:
assert archive_entry.birthtime in (
btime[0], format_time(*btime)
)
assert archive_entry.uid == 1000
assert archive_entry.gid == 1000
|