File: _repository.py

package info (click to toggle)
python-tuf 6.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,300 kB
  • sloc: python: 7,738; makefile: 8
file content (277 lines) | stat: -rw-r--r-- 10,070 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0

"""Repository Abstraction for metadata management"""

from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from contextlib import contextmanager, suppress
from copy import deepcopy
from typing import TYPE_CHECKING

from tuf.api.exceptions import UnsignedMetadataError
from tuf.api.metadata import (
    Metadata,
    MetaFile,
    Root,
    Signed,
    Snapshot,
    Targets,
    Timestamp,
)

if TYPE_CHECKING:
    from collections.abc import Generator

logger = logging.getLogger(__name__)


class AbortEdit(Exception):  # noqa: N818
    """Raise to exit the edit() contextmanager without saving changes"""


class Repository(ABC):
    """Abstract class for metadata modifying implementations

    NOTE: The repository module is not considered part of the python-tuf
    stable API yet.

    This class is intended to be a base class used in any metadata editing
    application, whether it is a real repository server or a developer tool.

    Implementations must implement open() and close(), and can then use the
    edit() contextmanager to implement actual operations. Note that signing
    an already existing version of metadata (as could be done for threshold
    signing) does not fit into this model of open()+close() or edit().

    A few operations (snapshot and timestamp) are already implemented
    in this base class.
    """

    @abstractmethod
    def open(self, role: str) -> Metadata:
        """Load a roles metadata from storage or cache, return it

        If role has no metadata, create first version from scratch.
        """
        raise NotImplementedError

    @abstractmethod
    def close(self, role: str, md: Metadata) -> None:
        """Write roles metadata into storage

        Update expiry and version and replace signatures with ones from all
        available keys. Keep snapshot_info and targets_infos updated.
        """
        raise NotImplementedError

    @property
    def targets_infos(self) -> dict[str, MetaFile]:
        """Returns the MetaFiles for current targets metadatas

        This property is used by do_snapshot() to update Snapshot.meta:
        Repository implementations should override this property to enable
        do_snapshot().

        Note that there is a difference between this return value and
        Snapshot.meta: This dictionary reflects the targets metadata that
        currently exists in the repository but Snapshot.meta also includes
        metadata that used to exist, but no longer exists, in the repository.
        """
        raise NotImplementedError

    @property
    def snapshot_info(self) -> MetaFile:
        """Returns the MetaFile for current snapshot metadata

        This property is used by do_timestamp() to update Timestamp.meta:
        Repository implementations should override this property to enable
        do_timestamp().
        """
        raise NotImplementedError

    @contextmanager
    def edit(self, role: str) -> Generator[Signed, None, None]:
        """Context manager for editing a role's metadata

        Context manager takes care of loading the roles metadata (or creating
        new metadata), updating expiry and version. The caller can do
        other changes to the Signed object and when the context manager exits,
        a new version of the roles metadata is stored.

        Context manager user can raise AbortEdit from inside the with-block to
        cancel the edit: in this case none of the changes are stored.
        """
        md = self.open(role)
        with suppress(AbortEdit):
            yield md.signed
            self.close(role, md)

    @contextmanager
    def edit_root(self) -> Generator[Root, None, None]:
        """Context manager for editing root metadata. See edit()"""
        with self.edit(Root.type) as root:
            if not isinstance(root, Root):
                raise AssertionError("Unexpected root type")
            yield root

    @contextmanager
    def edit_timestamp(self) -> Generator[Timestamp, None, None]:
        """Context manager for editing timestamp metadata. See edit()"""
        with self.edit(Timestamp.type) as timestamp:
            if not isinstance(timestamp, Timestamp):
                raise AssertionError("Unexpected timestamp type")
            yield timestamp

    @contextmanager
    def edit_snapshot(self) -> Generator[Snapshot, None, None]:
        """Context manager for editing snapshot metadata. See edit()"""
        with self.edit(Snapshot.type) as snapshot:
            if not isinstance(snapshot, Snapshot):
                raise AssertionError("Unexpected snapshot type")
            yield snapshot

    @contextmanager
    def edit_targets(
        self, rolename: str = Targets.type
    ) -> Generator[Targets, None, None]:
        """Context manager for editing targets metadata. See edit()"""
        with self.edit(rolename) as targets:
            if not isinstance(targets, Targets):
                raise AssertionError(f"Unexpected targets ({rolename}) type")
            yield targets

    def root(self) -> Root:
        """Read current root metadata"""
        root = self.open(Root.type).signed
        if not isinstance(root, Root):
            raise AssertionError("Unexpected root type")
        return root

    def timestamp(self) -> Timestamp:
        """Read current timestamp metadata"""
        timestamp = self.open(Timestamp.type).signed
        if not isinstance(timestamp, Timestamp):
            raise AssertionError("Unexpected timestamp type")
        return timestamp

    def snapshot(self) -> Snapshot:
        """Read current snapshot metadata"""
        snapshot = self.open(Snapshot.type).signed
        if not isinstance(snapshot, Snapshot):
            raise AssertionError("Unexpected snapshot type")
        return snapshot

    def targets(self, rolename: str = Targets.type) -> Targets:
        """Read current targets metadata"""
        targets = self.open(rolename).signed
        if not isinstance(targets, Targets):
            raise AssertionError("Unexpected targets type")
        return targets

    def do_snapshot(
        self, force: bool = False
    ) -> tuple[bool, dict[str, MetaFile]]:
        """Update snapshot meta information

        Updates the snapshot meta information according to current targets
        metadata state and the current snapshot meta information.

        Arguments:
            force: should new snapshot version be created even if meta
                information would not change?

        Returns: Tuple of
            - True if snapshot was created, False if not
            - MetaFiles for targets versions removed from snapshot meta
        """

        # Snapshot update is needed if
        # * any targets files are not yet in snapshot or
        # * any targets version is incorrect
        update_version = force
        removed: dict[str, MetaFile] = {}

        root = self.root()
        snapshot_md = self.open(Snapshot.type)

        try:
            root.verify_delegate(
                Snapshot.type,
                snapshot_md.signed_bytes,
                snapshot_md.signatures,
            )
        except UnsignedMetadataError:
            update_version = True

        with self.edit_snapshot() as snapshot:
            for keyname, new_meta in self.targets_infos.items():
                if keyname not in snapshot.meta:
                    update_version = True
                    snapshot.meta[keyname] = deepcopy(new_meta)
                    continue

                old_meta = snapshot.meta[keyname]
                if new_meta.version < old_meta.version:
                    raise ValueError(f"{keyname} version rollback")
                if new_meta.version > old_meta.version:
                    update_version = True
                    snapshot.meta[keyname] = deepcopy(new_meta)
                    removed[keyname] = old_meta

            if not update_version:
                # prevent edit_snapshot() from storing a new version
                raise AbortEdit("Skip snapshot: No targets version changes")

        if not update_version:
            # this is reachable as edit_snapshot() handles AbortEdit
            logger.debug("Snapshot update not needed")  # type: ignore[unreachable]
        else:
            logger.debug("Snapshot v%d", snapshot.version)

        return update_version, removed

    def do_timestamp(self, force: bool = False) -> tuple[bool, MetaFile | None]:
        """Update timestamp meta information

        Updates timestamp according to current snapshot state

        Returns: Tuple of
            - True if timestamp was created, False if not
            - MetaFile for snapshot version removed from timestamp (if any)
        """
        update_version = force
        removed = None

        root = self.root()
        timestamp_md = self.open(Timestamp.type)

        try:
            root.verify_delegate(
                Timestamp.type,
                timestamp_md.signed_bytes,
                timestamp_md.signatures,
            )
        except UnsignedMetadataError:
            update_version = True

        with self.edit_timestamp() as timestamp:
            if self.snapshot_info.version < timestamp.snapshot_meta.version:
                raise ValueError("snapshot version rollback")

            if self.snapshot_info.version > timestamp.snapshot_meta.version:
                update_version = True
                removed = timestamp.snapshot_meta
                timestamp.snapshot_meta = deepcopy(self.snapshot_info)

            if not update_version:
                raise AbortEdit("Skip timestamp: No snapshot version changes")

        if not update_version:
            # this is reachable as edit_timestamp() handles AbortEdit
            logger.debug("Timestamp update not needed")  # type: ignore[unreachable]
        else:
            logger.debug("Timestamp v%d", timestamp.version)
        return update_version, removed