File: DataSourceCloudCIX.py

package info (click to toggle)
cloud-init 25.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 12,412 kB
  • sloc: python: 135,894; sh: 3,883; makefile: 141; javascript: 30; xml: 22
file content (167 lines) | stat: -rw-r--r-- 5,311 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# This file is part of cloud-init. See LICENSE file for license information.

import json
import logging
from typing import Optional

from cloudinit import dmi, sources, url_helper, util

LOG = logging.getLogger(__name__)

METADATA_URLS = ["http://169.254.169.254"]
METADATA_VERSION = 1

CLOUDCIX_DMI_NAME = "CloudCIX"


class DataSourceCloudCIX(sources.DataSource):

    dsname = "CloudCIX"
    # Setup read_url parameters through get_url_params()
    url_retries = 3
    url_timeout_seconds = 5
    url_sec_between_retries = 5

    def __init__(self, sys_cfg, distro, paths):
        super(DataSourceCloudCIX, self).__init__(sys_cfg, distro, paths)
        self._metadata_url = None
        self._net_cfg = None

    def _get_data(self):
        """
        Fetch the user data and the metadata
        """
        try:
            crawled_data = self.crawl_metadata_service()
        except sources.InvalidMetaDataException as error:
            LOG.error(
                "Failed to read data from CloudCIX datasource: %s", error
            )
            return False

        self.metadata = crawled_data["meta-data"]
        self.userdata_raw = util.decode_binary(crawled_data["user-data"])

        return True

    def crawl_metadata_service(self) -> dict:
        md_url = self.determine_md_url()
        if md_url is None:
            raise sources.InvalidMetaDataException(
                "Could not determine metadata URL"
            )

        data = read_metadata(md_url, self.get_url_params())
        return data

    def determine_md_url(self) -> Optional[str]:
        if self._metadata_url:
            return self._metadata_url

        # Try to reach the metadata server
        url_params = self.get_url_params()
        base_url, _ = url_helper.wait_for_url(
            METADATA_URLS,
            max_wait=url_params.max_wait_seconds,
            timeout=url_params.timeout_seconds,
        )
        if not base_url:
            return None

        # Find the highest supported metadata version
        for version in range(METADATA_VERSION, 0, -1):
            url = url_helper.combine_url(
                base_url, "v{0}".format(version), "metadata"
            )
            try:
                response = url_helper.readurl(url, timeout=self.url_timeout)
            except url_helper.UrlError as e:
                LOG.debug("URL %s raised exception %s", url, e)
                continue

            if response.ok():
                self._metadata_url = url_helper.combine_url(
                    base_url, "v{0}".format(version)
                )
                break
            else:
                LOG.debug("No metadata found at URL %s", url)

        return self._metadata_url

    @staticmethod
    def ds_detect():
        return is_platform_viable()

    @property
    def network_config(self):
        if self._net_cfg:
            return self._net_cfg

        if not self.metadata:
            return None
        self._net_cfg = self.metadata["network"]
        return self._net_cfg


def is_platform_viable() -> bool:
    return dmi.read_dmi_data("system-product-name") == CLOUDCIX_DMI_NAME


def read_metadata(base_url: str, url_params):
    """
    Read metadata from metadata server at base_url

    :returns: dictionary of retrieved metadata and user data containing the
              following keys: meta-data, user-data
    :param: base_url: meta data server's base URL
    :param: url_params: dictionary of URL retrieval parameters. Valid keys are
            `retries`, `sec_between` and `timeout`.
    :raises: InvalidMetadataException upon network error connecting to metadata
             URL, error response from meta data server or failure to
             decode/parse metadata and userdata payload.
    """
    md = {}
    leaf_key_format_callback = (
        ("metadata", "meta-data", util.load_json),
        ("userdata", "user-data", util.maybe_b64decode),
    )

    for url_leaf, new_key, format_callback in leaf_key_format_callback:
        try:
            response = url_helper.readurl(
                url=url_helper.combine_url(base_url, url_leaf),
                retries=url_params.num_retries,
                sec_between=url_params.sec_between_retries,
                timeout=url_params.timeout_seconds,
            )
        except url_helper.UrlError as error:
            raise sources.InvalidMetaDataException(
                f"Failed to fetch IMDS {url_leaf}: "
                f"{base_url}/{url_leaf}: {error}"
            )

        if not response.ok():
            raise sources.InvalidMetaDataException(
                f"No valid {url_leaf} found. "
                f"URL {base_url}/{url_leaf} returned code {response.code}"
            )

        try:
            md[new_key] = format_callback(response.contents)
        except json.decoder.JSONDecodeError as exc:
            raise sources.InvalidMetaDataException(
                f"Invalid JSON at {base_url}/{url_leaf}: {exc}"
            ) from exc
    return md


# Used to match classes to dependencies
datasources = [
    (DataSourceCloudCIX, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)),
]


# Return a list of data sources that match this set of dependencies
def get_datasource_list(depends):
    return sources.list_from_depends(depends, datasources)