File: _download.py

package info (click to toggle)
python-openstacksdk 4.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 14,048 kB
  • sloc: python: 129,267; sh: 153; makefile: 23
file content (172 lines) | stat: -rw-r--r-- 5,742 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import collections.abc
import hashlib
import io
import typing as ty

from openstack import exceptions
from openstack import utils


def _verify_checksum(
    hasher: ty.Any,
    expected_hash: str | None,
    hash_algo: str | None = None,
) -> None:
    """Verify checksum using the provided hasher.

    :param hasher: A hashlib hash object
    :param expected_hash: The expected hexdigest value
    :param hash_algo: Optional name of the hash algorithm for error messages
    :raises: InvalidResponse if the hash doesn't match
    """
    if expected_hash:
        digest = hasher.hexdigest()
        if digest != expected_hash:
            algo_msg = f" ({hash_algo})" if hash_algo else ""
            raise exceptions.InvalidResponse(
                f"checksum mismatch{algo_msg}: {expected_hash} != {digest}"
            )


def _integrity_iter(
    iterable: collections.abc.Iterable[bytes],
    hasher: ty.Any,
    expected_hash: str | None,
    hash_algo: str | None,
) -> collections.abc.Iterator[bytes]:
    """Check image data integrity

    :param iterable: Iterable containing the image data chunks
    :param hasher: A hashlib hash object
    :param expected_hash: The expected hexdigest value
    :param hash_algo: The hash algorithm
    :yields: Chunks of data while computing hash
    :raises: InvalidResponse if the hash doesn't match
    """
    for chunk in iterable:
        hasher.update(chunk)
        yield chunk
    _verify_checksum(hasher, expected_hash, hash_algo)


def _write_chunks(
    fd: io.IOBase, chunks: collections.abc.Iterable[bytes]
) -> None:
    """Write chunks to file descriptor."""
    for chunk in chunks:
        fd.write(chunk)


class DownloadMixin:
    id: str
    base_path: str

    def fetch(
        self,
        session,
        requires_id=True,
        base_path=None,
        error_message=None,
        skip_cache=False,
        *,
        resource_response_key=None,
        microversion=None,
        **params,
    ): ...

    def download(
        self, session, stream=False, output=None, chunk_size=1024 * 1024
    ):
        """Download the data contained in an image.

        Checksum validation uses the hash algorithm metadata fields
        (hash_value + hash_algo) if available, otherwise falls back to MD5 via
        'checksum' or 'Content-MD5'. No validation is performed if neither is
        available.
        """

        # Fetch image metadata first to get hash info before downloading.
        # This prevents race conditions and the need for a second conditional
        # metadata retrieval if Content-MD5 is missing (story/1619675).
        details = self.fetch(session)
        meta_checksum = getattr(details, 'checksum', None)
        meta_hash_value = getattr(details, 'hash_value', None)
        meta_hash_algo = getattr(details, 'hash_algo', None)

        url = utils.urljoin(self.base_path, self.id, 'file')
        resp = session.get(url, stream=stream)

        hasher = None
        expected_hash = None
        hash_algo = None
        header_checksum = resp.headers.get("Content-MD5")

        if meta_hash_value and meta_hash_algo:
            try:
                hasher = hashlib.new(str(meta_hash_algo))
                expected_hash = meta_hash_value
                hash_algo = meta_hash_algo
            except ValueError as ve:
                if not str(ve).startswith('unsupported hash type'):
                    raise exceptions.SDKException(
                        f"Unsupported hash algorithm '{meta_hash_algo}': {ve}"
                    )

        # Fall back to MD5 from metadata or header
        if not hasher:
            md5_source = meta_checksum or header_checksum
            if md5_source:
                hasher = hashlib.md5(usedforsecurity=False)
                expected_hash = md5_source
                hash_algo = 'md5'

        if hasher is None:
            session.log.warning(
                "Unable to verify the integrity of image %s "
                "- no hash available",
                self.id,
            )

        if output:
            try:
                chunks = resp.iter_content(chunk_size=chunk_size)
                if hasher is not None:
                    chunks = _integrity_iter(
                        chunks, hasher, expected_hash, hash_algo
                    )

                if isinstance(output, io.IOBase):
                    _write_chunks(output, chunks)
                else:
                    with open(output, 'wb') as fd:
                        _write_chunks(fd, chunks)

                return resp
            except Exception as e:
                raise exceptions.SDKException(f"Unable to download image: {e}")

        if stream:
            # Set content-md5 header for backward compatibility with callers
            # who expect hash info in the response when streaming
            if hash_algo == 'md5' and expected_hash:
                resp.headers['content-md5'] = expected_hash
            return resp

        if hasher is not None:
            # Loads entire image into memory!
            hasher.update(resp.content)
            _verify_checksum(hasher, expected_hash, hash_algo)

        return resp