1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
|
# Copyright 2024 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Tests for tuf.repository module"""
from __future__ import annotations
import copy
import logging
import sys
import unittest
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from securesystemslib.signer import CryptoSigner, Signer
from tests import utils
from tuf.api.metadata import (
TOP_LEVEL_ROLE_NAMES,
DelegatedRole,
Delegations,
Metadata,
MetaFile,
Root,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.repository import Repository
logger = logging.getLogger(__name__)
_signed_init = {
Root.type: Root,
Snapshot.type: Snapshot,
Targets.type: Targets,
Timestamp.type: Timestamp,
}
class TestingRepository(Repository):
"""Very simple in-memory repository implementation
This repository keeps the metadata for all versions of all roles in memory.
It also keeps all target content in memory.
Mostly copied from examples/repository.
Attributes:
role_cache: Every historical metadata version of every role in this
repository. Keys are role names and values are lists of Metadata
signer_cache: All signers available to the repository. Keys are role
names, values are lists of signers
"""
expiry_period = timedelta(days=1)
def __init__(self) -> None:
# all versions of all metadata
self.role_cache: dict[str, list[Metadata]] = defaultdict(list)
# all current keys
self.signer_cache: dict[str, list[Signer]] = defaultdict(list)
# version cache for snapshot and all targets, updated in close().
# The 'defaultdict(lambda: ...)' trick allows close() to easily modify
# the version without always creating a new MetaFile
self._snapshot_info = MetaFile(1)
self._targets_infos: dict[str, MetaFile] = defaultdict(
lambda: MetaFile(1)
)
# setup a basic repository, generate signing key per top-level role
with self.edit_root() as root:
for role in ["root", "timestamp", "snapshot", "targets"]:
signer = CryptoSigner.generate_ecdsa()
self.signer_cache[role].append(signer)
root.add_key(signer.public_key, role)
for role in ["timestamp", "snapshot", "targets"]:
with self.edit(role):
pass
@property
def targets_infos(self) -> dict[str, MetaFile]:
return self._targets_infos
@property
def snapshot_info(self) -> MetaFile:
return self._snapshot_info
def open(self, role: str) -> Metadata:
"""Return current Metadata for role from 'storage'
(or create a new one)
"""
if role not in self.role_cache:
signed_init = _signed_init.get(role, Targets)
md = Metadata(signed_init())
# this makes version bumping in close() simpler
md.signed.version = 0
return md
# return a _copy_ of latest metadata from storage
return copy.deepcopy(self.role_cache[role][-1])
def close(self, role: str, md: Metadata) -> None:
"""Store a version of metadata. Handle version bumps, expiry, signing"""
md.signed.version += 1
md.signed.expires = datetime.now(timezone.utc) + self.expiry_period
md.signatures.clear()
for signer in self.signer_cache[role]:
md.sign(signer, append=True)
# store new metadata version, update version caches
self.role_cache[role].append(md)
if role == "snapshot":
self._snapshot_info.version = md.signed.version
elif role not in ["root", "timestamp"]:
self._targets_infos[f"{role}.json"].version = md.signed.version
class TestRepository(unittest.TestCase):
"""Tests for tuf.repository module."""
def setUp(self) -> None:
self.repo = TestingRepository()
def test_initial_repo_setup(self) -> None:
# check that we have metadata for top level roles
self.assertEqual(4, len(self.repo.role_cache))
for role in TOP_LEVEL_ROLE_NAMES:
# There should be a single version for each role
role_versions = self.repo.role_cache[role]
self.assertEqual(1, len(role_versions))
self.assertEqual(1, role_versions[-1].signed.version)
# test the Repository helpers:
self.assertIsInstance(self.repo.root(), Root)
self.assertIsInstance(self.repo.timestamp(), Timestamp)
self.assertIsInstance(self.repo.snapshot(), Snapshot)
self.assertIsInstance(self.repo.targets(), Targets)
def test_do_snapshot(self) -> None:
# Expect no-op because targets have not changed and snapshot is still valid
created, _ = self.repo.do_snapshot()
self.assertFalse(created)
snapshot_versions = self.repo.role_cache["snapshot"]
self.assertEqual(1, len(snapshot_versions))
self.assertEqual(1, snapshot_versions[-1].signed.version)
def test_do_snapshot_after_targets_change(self) -> None:
# do a targets change, expect do_snapshot to create a new snapshot
with self.repo.edit_targets() as targets:
targets.targets["path"] = TargetFile.from_data("path", b"data")
created, _ = self.repo.do_snapshot()
self.assertTrue(created)
snapshot_versions = self.repo.role_cache["snapshot"]
self.assertEqual(2, len(snapshot_versions))
self.assertEqual(2, snapshot_versions[-1].signed.version)
def test_do_snapshot_after_new_targets_delegation(self) -> None:
# Add new delegated target, expect do_snapshot to create a new snapshot
signer = CryptoSigner.generate_ecdsa()
self.repo.signer_cache["delegated"].append(signer)
# Add a new delegation to targets
with self.repo.edit_targets() as targets:
role = DelegatedRole("delegated", [], 1, True, [])
targets.delegations = Delegations({}, {"delegated": role})
targets.add_key(signer.public_key, "delegated")
# create a version of the delegated metadata
with self.repo.edit("delegated") as _:
pass
created, _ = self.repo.do_snapshot()
self.assertTrue(created)
snapshot_versions = self.repo.role_cache["snapshot"]
self.assertEqual(2, len(snapshot_versions))
self.assertEqual(2, snapshot_versions[-1].signed.version)
def test_do_snapshot_after_snapshot_key_change(self) -> None:
# change snapshot signing keys
with self.repo.edit_root() as root:
# remove key
keyid = root.roles["snapshot"].keyids[0]
root.revoke_key(keyid, "snapshot")
self.repo.signer_cache["snapshot"].clear()
# add new key
signer = CryptoSigner.generate_ecdsa()
self.repo.signer_cache["snapshot"].append(signer)
root.add_key(signer.public_key, "snapshot")
# snapshot is no longer signed correctly, expect do_snapshot to create a new snapshot
created, _ = self.repo.do_snapshot()
self.assertTrue(created)
snapshot_versions = self.repo.role_cache["snapshot"]
self.assertEqual(2, len(snapshot_versions))
self.assertEqual(2, snapshot_versions[-1].signed.version)
def test_do_timestamp(self) -> None:
# Expect no-op because snapshot has not changed and timestamp is still valid
created, _ = self.repo.do_timestamp()
self.assertFalse(created)
timestamp_versions = self.repo.role_cache["timestamp"]
self.assertEqual(1, len(timestamp_versions))
self.assertEqual(1, timestamp_versions[-1].signed.version)
def test_do_timestamp_after_snapshot_change(self) -> None:
# do a snapshot change, expect do_timestamp to create a new timestamp
self.repo.do_snapshot(force=True)
created, _ = self.repo.do_timestamp()
self.assertTrue(created)
timestamp_versions = self.repo.role_cache["timestamp"]
self.assertEqual(2, len(timestamp_versions))
self.assertEqual(2, timestamp_versions[-1].signed.version)
def test_do_timestamp_after_timestamp_key_change(self) -> None:
# change timestamp signing keys
with self.repo.edit_root() as root:
# remove key
keyid = root.roles["timestamp"].keyids[0]
root.revoke_key(keyid, "timestamp")
self.repo.signer_cache["timestamp"].clear()
# add new key
signer = CryptoSigner.generate_ecdsa()
self.repo.signer_cache["timestamp"].append(signer)
root.add_key(signer.public_key, "timestamp")
# timestamp is no longer signed correctly, expect do_timestamp to create a new timestamp
created, _ = self.repo.do_timestamp()
self.assertTrue(created)
timestamp_versions = self.repo.role_cache["timestamp"]
self.assertEqual(2, len(timestamp_versions))
self.assertEqual(2, timestamp_versions[-1].signed.version)
if __name__ == "__main__":
utils.configure_test_logging(sys.argv)
unittest.main()
|