1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
|
#!/usr/bin/env python3
# This file is part of cloud-init. See LICENSE file for license information.
"""Query standardized instance metadata provided to machine, returning a JSON
structure.
Some instance-data values may be binary on some platforms, such as userdata and
vendordata. Attempt to decompress and decode UTF-8 any binary values.
Any binary values in the instance metadata will be base64-encoded and prefixed
with "ci-b64:" in the output. userdata and, where applicable, vendordata may
be provided to the machine gzip-compressed (and therefore as binary data).
query will attempt to decompress these to a string before emitting the JSON
output; if this fails, they are treated as binary.
"""
import argparse
import os
import sys
from errno import EACCES
from cloudinit import log, util
from cloudinit.cmd.devel import addLogHandlerCLI, read_cfg_paths
from cloudinit.handlers.jinja_template import (
convert_jinja_instance_data,
get_jinja_variable_alias,
render_jinja_payload,
)
from cloudinit.sources import REDACT_SENSITIVE_VALUE
NAME = "query"
LOG = log.getLogger(NAME)
def get_parser(parser=None):
"""Build or extend an arg parser for query utility.
@param parser: Optional existing ArgumentParser instance representing the
query subcommand which will be extended to support the args of
this utility.
@returns: ArgumentParser with proper argument configuration.
"""
if not parser:
parser = argparse.ArgumentParser(prog=NAME, description=__doc__)
parser.add_argument(
"-d",
"--debug",
action="store_true",
default=False,
help="Add verbose messages during template render",
)
parser.add_argument(
"-i",
"--instance-data",
type=str,
help=(
"Path to instance-data.json file. Default is "
f"{read_cfg_paths().get_runpath('instance_data')}"
),
)
parser.add_argument(
"-l",
"--list-keys",
action="store_true",
default=False,
help=(
"List query keys available at the provided instance-data"
" <varname>."
),
)
parser.add_argument(
"-u",
"--user-data",
type=str,
help=(
"Path to user-data file. Default is"
" /var/lib/cloud/instance/user-data.txt"
),
)
parser.add_argument(
"-v",
"--vendor-data",
type=str,
help=(
"Path to vendor-data file. Default is"
" /var/lib/cloud/instance/vendor-data.txt"
),
)
parser.add_argument(
"varname",
type=str,
nargs="?",
help=(
"A dot-delimited specific variable to query from"
" instance-data. For example: v1.local_hostname. If the"
" value is not JSON serializable, it will be base64-encoded and"
' will contain the prefix "ci-b64:". '
),
)
parser.add_argument(
"-a",
"--all",
action="store_true",
default=False,
dest="dump_all",
help="Dump all available instance-data",
)
parser.add_argument(
"-f",
"--format",
type=str,
dest="format",
help=(
"Optionally specify a custom output format string. Any"
" instance-data variable can be specified between double-curly"
' braces. For example -f "{{ v2.cloud_name }}"'
),
)
return parser
def load_userdata(ud_file_path):
"""Attempt to return a string of user-data from ud_file_path
Attempt to decode or decompress if needed.
If unable to decode the content, raw bytes will be returned.
@returns: String of uncompressed userdata if possible, otherwise bytes.
"""
bdata = util.load_file(ud_file_path, decode=False)
try:
return bdata.decode("utf-8")
except UnicodeDecodeError:
return util.decomp_gzip(bdata, quiet=False, decode=True)
def _read_instance_data(instance_data, user_data, vendor_data) -> dict:
"""Return a dict of merged instance-data, vendordata and userdata.
The dict will contain supplemental userdata and vendordata keys sourced
from default user-data and vendor-data files.
Non-root users will have redacted INSTANCE_JSON_FILE content and redacted
vendordata and userdata values.
:raise: IOError/OSError on absence of instance-data.json file or invalid
access perms.
"""
uid = os.getuid()
paths = read_cfg_paths()
if instance_data:
instance_data_fn = instance_data
else:
redacted_data_fn = paths.get_runpath("instance_data")
if uid == 0:
sensitive_data_fn = paths.get_runpath("instance_data_sensitive")
if os.path.exists(sensitive_data_fn):
instance_data_fn = sensitive_data_fn
else:
LOG.warning(
"Missing root-readable %s. Using redacted %s instead.",
sensitive_data_fn,
redacted_data_fn,
)
instance_data_fn = redacted_data_fn
else:
instance_data_fn = redacted_data_fn
if user_data:
user_data_fn = user_data
else:
user_data_fn = os.path.join(paths.instance_link, "user-data.txt")
if vendor_data:
vendor_data_fn = vendor_data
else:
vendor_data_fn = os.path.join(paths.instance_link, "vendor-data.txt")
try:
instance_json = util.load_file(instance_data_fn)
except (IOError, OSError) as e:
if e.errno == EACCES:
LOG.error("No read permission on '%s'. Try sudo", instance_data_fn)
else:
LOG.error("Missing instance-data file: %s", instance_data_fn)
raise
instance_data = util.load_json(instance_json)
if uid != 0:
instance_data["userdata"] = "<%s> file:%s" % (
REDACT_SENSITIVE_VALUE,
user_data_fn,
)
instance_data["vendordata"] = "<%s> file:%s" % (
REDACT_SENSITIVE_VALUE,
vendor_data_fn,
)
else:
instance_data["userdata"] = load_userdata(user_data_fn)
instance_data["vendordata"] = load_userdata(vendor_data_fn)
return instance_data
def _find_instance_data_leaf_by_varname_path(
jinja_vars_without_aliases: dict,
jinja_vars_with_aliases: dict,
varname: str,
list_keys: bool,
):
"""Return the value of the dot-delimited varname path in instance-data
Split a dot-delimited jinja variable name path into components, walk the
path components into the instance_data and look up a matching jinja
variable name or cloud-init's underscore-delimited key aliases.
:raises: ValueError when varname represents an invalid key name or path or
if list-keys is provided by varname isn't a dict object.
"""
walked_key_path = ""
response = jinja_vars_without_aliases
for key_path_part in varname.split("."):
try:
# Walk key path using complete aliases dict, yet response
# should only contain jinja_without_aliases
jinja_vars_with_aliases = jinja_vars_with_aliases[key_path_part]
except KeyError as e:
if walked_key_path:
msg = "instance-data '{key_path}' has no '{leaf}'".format(
leaf=key_path_part, key_path=walked_key_path
)
else:
msg = "Undefined instance-data key '{}'".format(varname)
raise ValueError(msg) from e
if key_path_part in response:
response = response[key_path_part]
else: # We are an underscore_delimited key alias
for key in response:
if get_jinja_variable_alias(key) == key_path_part:
response = response[key]
break
if walked_key_path:
walked_key_path += "."
walked_key_path += key_path_part
return response
def handle_args(name, args):
"""Handle calls to 'cloud-init query' as a subcommand."""
addLogHandlerCLI(LOG, log.DEBUG if args.debug else log.WARNING)
if not any([args.list_keys, args.varname, args.format, args.dump_all]):
LOG.error(
"Expected one of the options: --all, --format,"
" --list-keys or varname"
)
get_parser().print_help()
return 1
try:
instance_data = _read_instance_data(
args.instance_data, args.user_data, args.vendor_data
)
except (IOError, OSError):
return 1
if args.format:
payload = "## template: jinja\n{fmt}".format(fmt=args.format)
rendered_payload = render_jinja_payload(
payload=payload,
payload_fn="query commandline",
instance_data=instance_data,
debug=True if args.debug else False,
)
if rendered_payload:
print(rendered_payload)
return 0
return 1
# If not rendering a structured format above, query output will be either:
# - JSON dump of all instance-data/jinja variables
# - JSON dump of a value at an dict path into the instance-data dict.
# - a list of keys for a specific dict path into the instance-data dict.
response = convert_jinja_instance_data(instance_data)
if args.varname:
jinja_vars_with_aliases = convert_jinja_instance_data(
instance_data, include_key_aliases=True
)
try:
response = _find_instance_data_leaf_by_varname_path(
jinja_vars_without_aliases=response,
jinja_vars_with_aliases=jinja_vars_with_aliases,
varname=args.varname,
list_keys=args.list_keys,
)
except (KeyError, ValueError) as e:
LOG.error(e)
return 1
if args.list_keys:
if not isinstance(response, dict):
LOG.error(
"--list-keys provided but '%s' is not a dict", args.varname
)
return 1
response = "\n".join(sorted(response.keys()))
if not isinstance(response, str):
response = util.json_dumps(response)
print(response)
return 0
def main():
"""Tool to query specific instance-data values."""
parser = get_parser()
sys.exit(handle_args(NAME, parser.parse_args()))
if __name__ == "__main__":
main()
|