1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
# This file is part of cloud-init. See LICENSE file for license information.
"""Query standardized instance metadata provided to machine, returning a JSON
structure.
Some instance-data values may be binary on some platforms, such as userdata and
vendordata. Attempt to decompress and decode UTF-8 any binary values.
Any binary values in the instance metadata will be base64-encoded and prefixed
with "ci-b64:" in the output. userdata and, where applicable, vendordata may
be provided to the machine gzip-compressed (and therefore as binary data).
query will attempt to decompress these to a string before emitting the JSON
output; if this fails, they are treated as binary.
"""
import argparse
from errno import EACCES
import os
import sys
from cloudinit.handlers.jinja_template import (
convert_jinja_instance_data, render_jinja_payload)
from cloudinit.cmd.devel import addLogHandlerCLI, read_cfg_paths
from cloudinit import log
from cloudinit.sources import (
INSTANCE_JSON_FILE, INSTANCE_JSON_SENSITIVE_FILE, REDACT_SENSITIVE_VALUE)
from cloudinit import util
NAME = 'query'
LOG = log.getLogger(NAME)
def get_parser(parser=None):
"""Build or extend an arg parser for query utility.
@param parser: Optional existing ArgumentParser instance representing the
query subcommand which will be extended to support the args of
this utility.
@returns: ArgumentParser with proper argument configuration.
"""
if not parser:
parser = argparse.ArgumentParser(
prog=NAME, description=__doc__)
parser.add_argument(
'-d', '--debug', action='store_true', default=False,
help='Add verbose messages during template render')
parser.add_argument(
'-i', '--instance-data', type=str,
help=('Path to instance-data.json file. Default is /run/cloud-init/%s'
% INSTANCE_JSON_FILE))
parser.add_argument(
'-l', '--list-keys', action='store_true', default=False,
help=('List query keys available at the provided instance-data'
' <varname>.'))
parser.add_argument(
'-u', '--user-data', type=str,
help=('Path to user-data file. Default is'
' /var/lib/cloud/instance/user-data.txt'))
parser.add_argument(
'-v', '--vendor-data', type=str,
help=('Path to vendor-data file. Default is'
' /var/lib/cloud/instance/vendor-data.txt'))
parser.add_argument(
'varname', type=str, nargs='?',
help=('A dot-delimited specific variable to query from'
' instance-data. For example: v1.local_hostname. If the'
' value is not JSON serializable, it will be base64-encoded and'
' will contain the prefix "ci-b64:". '))
parser.add_argument(
'-a', '--all', action='store_true', default=False, dest='dump_all',
help='Dump all available instance-data')
parser.add_argument(
'-f', '--format', type=str, dest='format',
help=('Optionally specify a custom output format string. Any'
' instance-data variable can be specified between double-curly'
' braces. For example -f "{{ v2.cloud_name }}"'))
return parser
def load_userdata(ud_file_path):
"""Attempt to return a string of user-data from ud_file_path
Attempt to decode or decompress if needed.
If unable to decode the content, raw bytes will be returned.
@returns: String of uncompressed userdata if possible, otherwise bytes.
"""
bdata = util.load_file(ud_file_path, decode=False)
try:
return bdata.decode('utf-8')
except UnicodeDecodeError:
return util.decomp_gzip(bdata, quiet=False, decode=True)
def handle_args(name, args):
"""Handle calls to 'cloud-init query' as a subcommand."""
paths = None
addLogHandlerCLI(LOG, log.DEBUG if args.debug else log.WARNING)
if not any([args.list_keys, args.varname, args.format, args.dump_all]):
LOG.error(
'Expected one of the options: --all, --format,'
' --list-keys or varname')
get_parser().print_help()
return 1
uid = os.getuid()
if not all([args.instance_data, args.user_data, args.vendor_data]):
paths = read_cfg_paths()
if args.instance_data:
instance_data_fn = args.instance_data
else:
redacted_data_fn = os.path.join(paths.run_dir, INSTANCE_JSON_FILE)
if uid == 0:
sensitive_data_fn = os.path.join(
paths.run_dir, INSTANCE_JSON_SENSITIVE_FILE)
if os.path.exists(sensitive_data_fn):
instance_data_fn = sensitive_data_fn
else:
LOG.warning(
'Missing root-readable %s. Using redacted %s instead.',
sensitive_data_fn, redacted_data_fn
)
instance_data_fn = redacted_data_fn
else:
instance_data_fn = redacted_data_fn
if args.user_data:
user_data_fn = args.user_data
else:
user_data_fn = os.path.join(paths.instance_link, 'user-data.txt')
if args.vendor_data:
vendor_data_fn = args.vendor_data
else:
vendor_data_fn = os.path.join(paths.instance_link, 'vendor-data.txt')
try:
instance_json = util.load_file(instance_data_fn)
except (IOError, OSError) as e:
if e.errno == EACCES:
LOG.error("No read permission on '%s'. Try sudo", instance_data_fn)
else:
LOG.error('Missing instance-data file: %s', instance_data_fn)
return 1
instance_data = util.load_json(instance_json)
if uid != 0:
instance_data['userdata'] = (
'<%s> file:%s' % (REDACT_SENSITIVE_VALUE, user_data_fn))
instance_data['vendordata'] = (
'<%s> file:%s' % (REDACT_SENSITIVE_VALUE, vendor_data_fn))
else:
instance_data['userdata'] = load_userdata(user_data_fn)
instance_data['vendordata'] = load_userdata(vendor_data_fn)
if args.format:
payload = '## template: jinja\n{fmt}'.format(fmt=args.format)
rendered_payload = render_jinja_payload(
payload=payload, payload_fn='query commandline',
instance_data=instance_data,
debug=True if args.debug else False)
if rendered_payload:
print(rendered_payload)
return 0
return 1
response = convert_jinja_instance_data(instance_data)
if args.varname:
try:
for var in args.varname.split('.'):
response = response[var]
except KeyError:
LOG.error('Undefined instance-data key %s', args.varname)
return 1
if args.list_keys:
if not isinstance(response, dict):
LOG.error("--list-keys provided but '%s' is not a dict", var)
return 1
response = '\n'.join(sorted(response.keys()))
elif args.list_keys:
response = '\n'.join(sorted(response.keys()))
if not isinstance(response, str):
response = util.json_dumps(response)
print(response)
return 0
def main():
"""Tool to query specific instance-data values."""
parser = get_parser()
sys.exit(handle_args(NAME, parser.parse_args()))
if __name__ == '__main__':
main()
# vi: ts=4 expandtab
|