1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
# Copyright (C) 2025 Siemens
#
# SPDX-License-Identifier: MIT
from datetime import datetime
from email.utils import parsedate_to_datetime
import io
from pathlib import Path
import tarfile
from debian import deb822
import pytest
import requests
import zstandard
import lz4.frame
from debsbom.download import PackageDownloader
from debsbom.repack import SourceArchiveMerger
import debsbom.dpkg.package as dpkg
from debsbom.repack.merger import CorruptedFileError, DscFileNotFoundError
import debsbom.snapshot.client as sdlclient
from debsbom.util import Compression
from debsbom.util.checksum import ChecksumAlgo
def test_compressor_from_tool():
assert Compression.from_tool(None) == Compression.NONE
for c in Compression.formats():
assert Compression.from_tool(c.tool) == c
with pytest.raises(RuntimeError):
Compression.from_tool("false")
def test_compressor_from_ext():
assert Compression.from_ext("") == Compression.NONE
assert Compression.from_ext(None) == Compression.NONE
for c in Compression.formats():
assert Compression.from_ext(c.fileext) == c
with pytest.raises(ValueError):
Compression.from_ext("foobar")
@pytest.fixture(scope="session")
def dldir(tmp_path_factory):
return tmp_path_factory.mktemp("downloads")
@pytest.fixture(scope="session")
def some_packages(dldir, http_session):
sdl = sdlclient.SnapshotDataLake(session=http_session)
dl = PackageDownloader(dldir, http_session)
packages = [
# .orig.tar and .debian.tar
dpkg.SourcePackage(
"libnet-smtp-ssl-perl",
"1.04-2",
checksums={
ChecksumAlgo.SHA256SUM: "b5e63090e1608c37ead4432206028fda37046128bfcaf3eb7ba58251875295a1"
},
),
# .orig.tar and .debian.tar with epoch
dpkg.SourcePackage("libcap2", "1:2.75-10"),
# debian dir in sources
dpkg.SourcePackage("dgit", "13.13"),
# debian dir via compressed .diff
dpkg.SourcePackage("libdata-validate-domain-perl", "0.10-1.1"),
]
for p in packages:
dl.register(list(sdlclient.SourcePackage(sdl, p.name, str(p.version)).srcfiles()), p)
list(dl.download())
return packages
EXPECTED_CHANGELOG_TIMESTAMPS = {
("libnet-smtp-ssl-perl", "1.04-2"): "Fri, 01 Jul 2022 00:09:44 +0100",
("libcap2", "1:2.75-10"): "Sat, 26 Jul 2025 20:46:06 +0200",
("dgit", "13.13"): "Sun, 24 Aug 2025 11:43:28 +0100",
("libdata-validate-domain-perl", "0.10-1.1"): "Sun, 27 Dec 2020 17:26:02 +0100",
}
@pytest.mark.parametrize("compress", [None, "bzip2", "gzip", "xz", "zstd", "lz4"])
@pytest.mark.parametrize("mtime", [None, "Wed, 01 Oct 2025 12:34:56 +0100"])
@pytest.mark.online
def test_merger(tmpdir, some_packages, dldir, compress, mtime):
outdir = Path(tmpdir / "merged")
sam = SourceArchiveMerger(dldir / "sources", outdir, compress=Compression.from_tool(compress))
for p in some_packages:
if mtime:
expected_timestamp_str = mtime
else:
expected_timestamp_str = EXPECTED_CHANGELOG_TIMESTAMPS[(p.name, p.version)]
dt_object = parsedate_to_datetime(expected_timestamp_str)
expected_timestamp = int(dt_object.timestamp())
result = sam.merge(
p,
apply_patches=True,
mtime=dt_object if mtime else None,
)
assert p.name in result.name
extract_path = Path(tmpdir) / f"extracted_{p.name}"
extract_path.mkdir(exist_ok=True, parents=False)
tar_open_args = {"name": result}
if compress == "zstd" or compress == "lz4":
with open(result, mode="rb") as compressed_file:
if compress == "zstd":
dctx = zstandard.ZstdDecompressor()
decompressed_data_buffer = io.BytesIO()
dctx.copy_stream(compressed_file, decompressed_data_buffer)
else:
decompressed_data_buffer = io.BytesIO(
lz4.frame.decompress(compressed_file.read())
)
decompressed_data_buffer.seek(0)
tar_open_args = {"fileobj": decompressed_data_buffer}
with tarfile.open(**tar_open_args, mode="r") as tar:
tar.extraction_filter = getattr(tarfile, "data_filter", (lambda member, path: member))
tar.extractall(path=extract_path)
found = False
for item_path in extract_path.rglob("*"):
found = True
mtime_unix = item_path.stat().st_mtime
assert mtime_unix == expected_timestamp, (
f"Expected mtime {expected_timestamp_str} (Unix: {expected_timestamp}) "
f"but got {datetime.fromtimestamp(mtime_unix)} for file {item_path}"
)
assert found, "No files found in the extracted archive to check timestamps"
@pytest.mark.online
def test_merger_bad_checksum(tmpdir, some_packages, dldir):
outdir = Path(tmpdir / "merged")
sam = SourceArchiveMerger(dldir / "sources", outdir)
print(dldir / "sources")
# Test 1: tamper dsc file
pkg = some_packages[0]
dsc_file = sam.locate_artifact(pkg, sam.dldir)
# tamper dsc file by appending data
with open(dsc_file, "a") as f:
# note: this only tampers the checksum but not the signature
# as we append data outside of the signed block. However, debsbom
# anyways only relies on checksums, not signatures
f.write("\n")
# we don't get a corruption error as the dsc file is looked up by checksum
# as there might be multiple .dsc on the snapshot mirror with the same name
# and we only return the dsc that matches the checksum (if we have a checksum)
with pytest.raises(DscFileNotFoundError):
sam.merge(pkg)
# Test 2: tamper binary artifacts
pkg = some_packages[1]
dsc_file = sam.locate_artifact(pkg, sam.dldir)
with open(dsc_file, "r") as f:
d = deb822.Dsc(f)
filename = d.get("Checksums-Sha1")[0]["name"]
suffix = Path(filename).suffix
# replace with tampered version (correctly compressed tar, but empty)
with tarfile.open(dsc_file.parent / filename, f"w:{suffix[1:]}") as tar:
pass
# as we tamper a binary, the checksum in the dsc file does not match the
# one of the binary and we get the corrupted file error
with pytest.raises(CorruptedFileError):
sam.merge(pkg)
|