File: storage.py

package info (click to toggle)
python-securesystemslib 1.3.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,316 kB
  • sloc: python: 5,319; sh: 38; makefile: 5
file content (285 lines) | stat: -rw-r--r-- 9,229 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
<Program Name>
  storage.py

<Author>
  Joshua Lock <jlock@vmware.com>

<Started>
  April 9, 2020

<Copyright>
  See LICENSE for licensing information.

<Purpose>
  Provides an interface for filesystem interactions, StorageBackendInterface.
"""

from __future__ import annotations

import errno
import logging
import os
import shutil
import stat
from abc import ABCMeta, abstractmethod
from collections.abc import Iterator
from contextlib import contextmanager
from typing import IO, Any, BinaryIO

from securesystemslib import exceptions

logger = logging.getLogger(__name__)


class StorageBackendInterface(metaclass=ABCMeta):
    """
    <Purpose>
    Defines an interface for abstract storage operations which can be implemented
    for a variety of storage solutions, such as remote and local filesystems.
    """

    @abstractmethod
    @contextmanager
    def get(self, filepath: str) -> Iterator[BinaryIO]:
        """
        <Purpose>
          A context manager for 'with' statements that is used for retrieving files
          from a storage backend and cleans up the files upon exit.

            with storage_backend.get('/path/to/file') as file_object:
              # operations
            # file is now closed

        <Arguments>
          filepath:
            The full path of the file to be retrieved.

        <Exceptions>
          securesystemslib.exceptions.StorageError, if the file does not exist or is
          no accessible.

        <Returns>
          A ContextManager object that emits a file-like object for the file at
          'filepath'.
        """
        raise NotImplementedError  # pragma: no cover

    @abstractmethod
    def put(self, fileobj: IO, filepath: str, restrict: bool | None = False) -> None:
        """
        <Purpose>
          Store a file-like object in the storage backend.
          The file-like object is read from the beginning, not its current
          offset (if any).

        <Arguments>
          fileobj:
            The file-like object to be stored.

          filepath:
            The full path to the location where 'fileobj' will be stored.

          restrict:
            Whether the file should be created with restricted permissions.
            What counts as restricted is backend-specific. For a filesystem on a
            UNIX-like operating system, that may mean read/write permissions only
            for the user (octal mode 0o600). For a cloud storage system, that
            likely means Cloud provider specific ACL restrictions.

        <Exceptions>
          securesystemslib.exceptions.StorageError, if the file can not be stored.

        <Returns>
          None
        """
        raise NotImplementedError  # pragma: no cover

    @abstractmethod
    def remove(self, filepath: str) -> None:
        """
        <Purpose>
          Remove the file at 'filepath' from the storage.

        <Arguments>
          filepath:
            The full path to the file.

        <Exceptions>
          securesystemslib.exceptions.StorageError, if the file can not be removed.

        <Returns>
          None
        """
        raise NotImplementedError  # pragma: no cover

    @abstractmethod
    def getsize(self, filepath: str) -> int:
        """
        <Purpose>
          Retrieve the size, in bytes, of the file at 'filepath'.

        <Arguments>
          filepath:
            The full path to the file.

        <Exceptions>
          securesystemslib.exceptions.StorageError, if the file does not exist or is
          not accessible.

        <Returns>
          The size in bytes of the file at 'filepath'.
        """
        raise NotImplementedError  # pragma: no cover

    @abstractmethod
    def create_folder(self, filepath: str) -> None:
        """
        <Purpose>
          Create a folder at filepath and ensure all intermediate components of the
          path exist.
          Passing an empty string for filepath does nothing and does not raise an
          exception.

        <Arguments>
          filepath:
            The full path of the folder to be created.

        <Exceptions>
          securesystemslib.exceptions.StorageError, if the folder can not be
          created.

        <Returns>
          None
        """
        raise NotImplementedError  # pragma: no cover

    @abstractmethod
    def list_folder(self, filepath: str) -> list[str]:
        """
        <Purpose>
          List the contents of the folder at 'filepath'.

        <Arguments>
          filepath:
            The full path of the folder to be listed.

        <Exceptions>
          securesystemslib.exceptions.StorageError, if the file does not exist or is
          not accessible.

        <Returns>
          A list containing the names of the files in the folder. May be an empty
          list.
        """
        raise NotImplementedError  # pragma: no cover


class FilesystemBackend(StorageBackendInterface):
    """
    <Purpose>
      A concrete implementation of StorageBackendInterface which interacts with
      local filesystems using Python standard library functions.
    """

    # As FilesystemBackend is effectively a stateless wrapper around various
    # standard library operations, we only ever need a single instance of it.
    # That single instance is safe to be (re-)used by all callers. Therefore
    # implement the singleton pattern to avoid uneccesarily creating multiple
    # objects.
    _instance = None

    def __new__(cls, *args: Any, **kwargs: Any) -> FilesystemBackend:
        if cls._instance is None:
            cls._instance = object.__new__(cls, *args, **kwargs)
        return cls._instance

    @contextmanager
    def get(self, filepath: str) -> Iterator[BinaryIO]:
        file_object = None
        try:
            file_object = open(filepath, "rb")
            yield file_object
        except OSError:
            raise exceptions.StorageError(f"Can't open {filepath}")
        finally:
            if file_object is not None:
                file_object.close()

    def put(self, fileobj: IO, filepath: str, restrict: bool | None = False) -> None:
        # If we are passed an open file, seek to the beginning such that we are
        # copying the entire contents
        if not fileobj.closed:
            fileobj.seek(0)

        # If a file with the same name already exists, the new permissions
        # may not be applied.
        try:
            os.remove(filepath)
        except OSError:
            pass

        try:
            if restrict:
                # On UNIX-based systems restricted files are created with read and
                # write permissions for the user only (octal value 0o600).
                fd = os.open(
                    filepath,
                    os.O_WRONLY | os.O_CREAT,
                    stat.S_IRUSR | stat.S_IWUSR,
                )
            else:
                # Non-restricted files use the default 'mode' argument of os.open()
                # granting read, write, and execute for all users (octal mode 0o777).
                # NOTE: mode may be modified by the user's file mode creation mask
                # (umask) or on Windows limited to the smaller set of OS supported
                # permisssions.
                fd = os.open(filepath, os.O_WRONLY | os.O_CREAT)

            with os.fdopen(fd, "wb") as destination_file:
                shutil.copyfileobj(fileobj, destination_file)
                # Force the destination file to be written to disk
                # from Python's internal and the operating system's buffers.
                # os.fsync() should follow flush().
                destination_file.flush()
                os.fsync(destination_file.fileno())
        except OSError:
            raise exceptions.StorageError(f"Can't write file {filepath}")

    def remove(self, filepath: str) -> None:
        try:
            os.remove(filepath)
        except (
            FileNotFoundError,
            PermissionError,
            OSError,
        ):  # pragma: no cover
            raise exceptions.StorageError(f"Can't remove file {filepath}")

    def getsize(self, filepath: str) -> int:
        try:
            return os.path.getsize(filepath)
        except OSError:
            raise exceptions.StorageError(f"Can't access file {filepath}")

    def create_folder(self, filepath: str) -> None:
        try:
            os.makedirs(filepath)
        except OSError as e:
            # 'OSError' raised if the leaf directory already exists or cannot be
            # created. Check for case where 'filepath' has already been created and
            # silently ignore.
            if e.errno == errno.EEXIST:
                pass
            elif e.errno == errno.ENOENT and not filepath:
                raise exceptions.StorageError(
                    "Can't create a folder with an empty filepath!"
                )
            else:
                raise exceptions.StorageError(f"Can't create folder at {filepath}")

    def list_folder(self, filepath: str) -> list[str]:
        try:
            return os.listdir(filepath)
        except FileNotFoundError:
            raise exceptions.StorageError(f"Can't list folder at {filepath}")