1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
# SPDX-FileCopyrightText: Peter Pentchev <roam@ringlet.net>
# SPDX-License-Identifier: GPL-2.0-or-later
"""Helper functions for the `testsigs` unit test suite for `debsigs."""
from __future__ import annotations
import contextlib
import dataclasses
import functools
import itertools
import os
import pathlib
import subprocess # noqa: S404
import sys
import tempfile
import typing
import feature_check
import pytest
import typedload.dataloader
from feature_check import version as fver
from testsigs import defs # noqa: TC001 # typedload needs this
from testsigs.parse import debsigs as p_debsigs
if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib
if typing.TYPE_CHECKING:
from collections.abc import Iterator
from typing import Final
@dataclasses.dataclass(frozen=True)
class ConfigAggrBase:
"""The aggregated test configuration stored by the `testsigs` runner."""
cfg: defs.Config
"""The runtime configuration of the `testsigs` runner."""
keys: defs.KeyFiles
"""The OpenPGP keys generated beforehand."""
pub_key: defs.PublicKey
"""The OpenPGP key to sign with."""
changes_orig: defs.ChangesFile
"""The path to the downloaded Debian package to sign."""
@contextlib.contextmanager
def cfg_with_tempd(self) -> Iterator[defs.Config]:
"""Create a temporary directory, replace `cfg.path.work` with that."""
with tempfile.TemporaryDirectory(prefix="unit.", dir=self.cfg.path.work) as tempd_obj:
tempd: Final = pathlib.Path(tempd_obj)
yield dataclasses.replace(
self.cfg,
path=dataclasses.replace(self.cfg.path, work=tempd),
)
@dataclasses.dataclass(frozen=True)
class ConfigAggr(ConfigAggrBase):
"""The aggregated test configuration along with some more things we may need."""
debsigs_features: dict[str, feature_check.Version]
"""The parsed output of `debsigs --features`."""
def has_feature(self, feature: str, min_version: str) -> bool:
"""Determine whether the specified feature is there at that version or later."""
found: Final = self.debsigs_features.get(feature)
if found is None:
return False
return fver.version_compare(found, feature_check.parse_version(min_version)) >= 0
def has_all_features(self, features: dict[str, str]) -> bool:
"""Determine whether all of the specified features are there at least at those versions."""
return all(itertools.starmap(self.has_feature, features.items()))
@functools.lru_cache
def get_loader() -> typedload.dataloader.Loader:
"""Prepare a dataloader that understands deferred type annotations."""
return typedload.dataloader.Loader(pep563=True)
@pytest.fixture
def test_cfg() -> ConfigAggr:
"""Load the aggregated test configuration."""
with pathlib.Path(os.environ["DEBSIGS_TEST_CONFIG_FILE"]).open(mode="rb") as cfgf:
cfg_base: Final = get_loader().load(tomllib.load(cfgf), ConfigAggrBase)
return ConfigAggr(
cfg=cfg_base.cfg,
keys=cfg_base.keys,
pub_key=cfg_base.pub_key,
changes_orig=cfg_base.changes_orig,
debsigs_features=feature_check.obtain_features(str(cfg_base.cfg.prog.debsigs)),
)
def debsigs_list(cfg: defs.Config, pkg: pathlib.Path) -> list[defs.DebSig]:
"""Run `debsigs --list`, return any signatures returned."""
raw: Final = subprocess.check_output( # noqa: S603
[cfg.prog.debsigs, "--list", "--", pkg],
cwd=cfg.path.work,
encoding="UTF-8",
env=cfg.env,
)
return p_debsigs.parse_list(raw)
def get_signing_subkey(pub_key: defs.PublicKey) -> defs.Subkey:
"""Find a single subkey capable of signing."""
match [subkey for subkey in pub_key.subkeys if "s" in subkey.capabilities]:
case [single]:
return single
case _:
raise RuntimeError(repr(pub_key))
|