1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
|
# Copyright: (c) 2017, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import atexit
import decimal
import configparser
import os
import os.path
import sys
import stat
import tempfile
import typing as t
from collections.abc import Mapping, Sequence
from jinja2.nativetypes import NativeEnvironment
from ansible._internal._datatag import _tags
from ansible.errors import AnsibleOptionsError, AnsibleError, AnsibleUndefinedConfigEntry, AnsibleRequiredOptionError
from ansible.module_utils._internal._datatag import AnsibleTagHelper
from ansible.module_utils.common.sentinel import Sentinel
from ansible.module_utils.common.text.converters import to_text, to_bytes, to_native
from ansible.module_utils.common.yaml import yaml_load
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.parsing.quoting import unquote
from ansible.utils.path import cleanup_tmp_file, makedirs_safe, unfrackpath
INTERNAL_DEFS = {'lookup': ('_terms',)}
GALAXY_SERVER_DEF = [
('url', True, 'str'),
('username', False, 'str'),
('password', False, 'str'),
('token', False, 'str'),
('auth_url', False, 'str'),
('api_version', False, 'int'),
('validate_certs', False, 'bool'),
('client_id', False, 'str'),
('client_secret', False, 'str'),
('timeout', False, 'int'),
]
# config definition fields
GALAXY_SERVER_ADDITIONAL = {
'api_version': {'default': None, 'choices': [2, 3]},
'validate_certs': {'cli': [{'name': 'validate_certs'}]},
'timeout': {'cli': [{'name': 'timeout'}]},
'token': {'default': None},
}
@t.runtime_checkable
class _EncryptedStringProtocol(t.Protocol):
"""Protocol representing an `EncryptedString`, since it cannot be imported here."""
# DTFIX-FUTURE: collapse this with the one in collection loader, once we can
def _decrypt(self) -> str: ...
def _get_config_label(plugin_type: str, plugin_name: str, config: str) -> str:
"""Return a label for the given config."""
entry = f'{config!r}'
if plugin_type:
entry += ' for'
if plugin_name:
entry += f' {plugin_name!r}'
entry += f' {plugin_type} plugin'
return entry
def ensure_type(value: object, value_type: str | None, origin: str | None = None, origin_ftype: str | None = None) -> t.Any:
"""
Converts `value` to the requested `value_type`; raises `ValueError` for failed conversions.
Values for `value_type` are:
* boolean/bool: Return a `bool` by applying non-strict `bool` filter rules:
'y', 'yes', 'on', '1', 'true', 't', 1, 1.0, True return True, any other value is False.
* integer/int: Return an `int`. Accepts any `str` parseable by `int` or numeric value with a zero mantissa (including `bool`).
* float: Return a `float`. Accepts any `str` parseable by `float` or numeric value (including `bool`).
* list: Return a `list`. Accepts `list` or `Sequence`. Also accepts, `str`, splitting on ',' while stripping whitespace and unquoting items.
* none: Return `None`. Accepts only the string "None".
* path: Return a resolved path. Accepts `str`.
* temppath/tmppath/tmp: Return a unique temporary directory inside the resolved path specified by the value.
* pathspec: Return a `list` of resolved paths. Accepts a `list` or `Sequence`. Also accepts `str`, splitting on ':'.
* pathlist: Return a `list` of resolved paths. Accepts a `list` or `Sequence`. Also accepts `str`, splitting on `,` while stripping whitespace from paths.
* dictionary/dict: Return a `dict`. Accepts `dict` or `Mapping`.
* string/str: Return a `str`. Accepts `bool`, `int`, `float`, `complex` or `str`.
Path resolution ensures paths are `str` with expansion of '{{CWD}}', environment variables and '~'.
Non-absolute paths are expanded relative to the basedir from `origin`, if specified.
No conversion is performed if `value_type` is unknown or `value` is `None`.
When `origin_ftype` is "ini", a `str` result will be unquoted.
"""
if value is None:
return None
original_value = value
copy_tags = value_type not in ('temppath', 'tmppath', 'tmp')
value = _ensure_type(value, value_type, origin)
if copy_tags and value is not original_value:
if isinstance(value, list):
value = [AnsibleTagHelper.tag_copy(original_value, item) for item in value]
value = AnsibleTagHelper.tag_copy(original_value, value)
if isinstance(value, str) and origin_ftype and origin_ftype == 'ini':
value = unquote(value)
return value
def _ensure_type(value: object, value_type: str | None, origin: str | None = None) -> t.Any:
"""Internal implementation for `ensure_type`, call that function instead."""
original_value = value
basedir = origin if origin and os.path.isabs(origin) and os.path.exists(to_bytes(origin)) else None
if value_type:
value_type = value_type.lower()
match value_type:
case 'boolean' | 'bool':
return boolean(value, strict=False)
case 'integer' | 'int':
if isinstance(value, int): # handle both int and bool (which is an int)
return int(value)
if isinstance(value, (float, str)):
try:
# use Decimal for all other source type conversions; non-zero mantissa is a failure
if (decimal_value := decimal.Decimal(value)) == (int_part := int(decimal_value)):
return int_part
except (decimal.DecimalException, ValueError):
pass
case 'float':
if isinstance(value, float):
return value
if isinstance(value, (int, str)):
try:
return float(value)
except ValueError:
pass
case 'list':
if isinstance(value, list):
return value
if isinstance(value, str):
return [unquote(x.strip()) for x in value.split(',')]
if isinstance(value, Sequence) and not isinstance(value, bytes):
return list(value)
case 'none':
if value == "None":
return None
case 'path':
if isinstance(value, str):
return resolve_path(value, basedir=basedir)
case 'temppath' | 'tmppath' | 'tmp':
if isinstance(value, str):
value = resolve_path(value, basedir=basedir)
if not os.path.exists(value):
makedirs_safe(value, 0o700)
prefix = 'ansible-local-%s' % os.getpid()
value = tempfile.mkdtemp(prefix=prefix, dir=value)
atexit.register(cleanup_tmp_file, value, warn=True)
return value
case 'pathspec':
if isinstance(value, str):
value = value.split(os.pathsep)
if isinstance(value, Sequence) and not isinstance(value, bytes) and all(isinstance(x, str) for x in value):
return [resolve_path(x, basedir=basedir) for x in value]
case 'pathlist':
if isinstance(value, str):
value = [x.strip() for x in value.split(',')]
if isinstance(value, Sequence) and not isinstance(value, bytes) and all(isinstance(x, str) for x in value):
return [resolve_path(x, basedir=basedir) for x in value]
case 'dictionary' | 'dict':
if isinstance(value, dict):
return value
if isinstance(value, Mapping):
return dict(value)
case 'string' | 'str':
if isinstance(value, str):
return value
if isinstance(value, (bool, int, float, complex)):
return str(value)
if isinstance(value, _EncryptedStringProtocol):
return value._decrypt()
case _:
# FIXME: define and document a pass-through value_type (None, 'raw', 'object', '', ...) and then deprecate acceptance of unknown types
return value # return non-str values of unknown value_type as-is
raise ValueError(f'Invalid value provided for {value_type!r}: {original_value!r}')
# FIXME: see if this can live in utils/path
def resolve_path(path: str, basedir: str | None = None) -> str:
""" resolve relative or 'variable' paths """
if '{{CWD}}' in path: # allow users to force CWD using 'magic' {{CWD}}
path = path.replace('{{CWD}}', os.getcwd())
return unfrackpath(path, follow=False, basedir=basedir)
# FIXME: generic file type?
def get_config_type(cfile):
ftype = None
if cfile is not None:
ext = os.path.splitext(cfile)[-1]
if ext in ('.ini', '.cfg'):
ftype = 'ini'
elif ext in ('.yaml', '.yml'):
ftype = 'yaml'
else:
raise AnsibleOptionsError("Unsupported configuration file extension for %s: %s" % (cfile, to_native(ext)))
return ftype
# FIXME: can move to module_utils for use for ini plugins also?
def get_ini_config_value(p, entry):
""" returns the value of last ini entry found """
value = None
if p is not None:
try:
value = p.get(entry.get('section', 'defaults'), entry.get('key', ''), raw=True)
except Exception: # FIXME: actually report issues here
pass
return value
def find_ini_config_file(warnings=None):
""" Load INI Config File order(first found is used): ENV, CWD, HOME, /etc/ansible """
# FIXME: eventually deprecate ini configs
if warnings is None:
# Note: In this case, warnings does nothing
warnings = set()
potential_paths = []
# A value that can never be a valid path so that we can tell if ANSIBLE_CONFIG was set later
# We can't use None because we could set path to None.
# Environment setting
path_from_env = os.getenv("ANSIBLE_CONFIG", Sentinel)
if path_from_env is not Sentinel:
path_from_env = unfrackpath(path_from_env, follow=False)
if os.path.isdir(to_bytes(path_from_env)):
path_from_env = os.path.join(path_from_env, "ansible.cfg")
potential_paths.append(path_from_env)
# Current working directory
warn_cmd_public = False
try:
cwd = os.getcwd()
perms = os.stat(cwd)
cwd_cfg = os.path.join(cwd, "ansible.cfg")
if perms.st_mode & stat.S_IWOTH:
# Working directory is world writable so we'll skip it.
# Still have to look for a file here, though, so that we know if we have to warn
if os.path.exists(cwd_cfg):
warn_cmd_public = True
else:
potential_paths.append(to_text(cwd_cfg, errors='surrogate_or_strict'))
except OSError:
# If we can't access cwd, we'll simply skip it as a possible config source
pass
# Per user location
potential_paths.append(unfrackpath("~/.ansible.cfg", follow=False))
# System location
potential_paths.append("/etc/ansible/ansible.cfg")
for path in potential_paths:
b_path = to_bytes(path)
if os.path.exists(b_path) and os.access(b_path, os.R_OK):
break
else:
path = None
# Emit a warning if all the following are true:
# * We did not use a config from ANSIBLE_CONFIG
# * There's an ansible.cfg in the current working directory that we skipped
if path_from_env != path and warn_cmd_public:
warnings.add(u"Ansible is being run in a world writable directory (%s),"
u" ignoring it as an ansible.cfg source."
u" For more information see"
u" https://docs.ansible.com/ansible/devel/reference_appendices/config.html#cfg-in-world-writable-dir"
% to_text(cwd))
return path
def _add_base_defs_deprecations(base_defs):
"""Add deprecation source 'ansible.builtin' to deprecations in base.yml"""
def process(entry):
if 'deprecated' in entry:
entry['deprecated']['collection_name'] = 'ansible.builtin'
for dummy, data in base_defs.items():
process(data)
for section in ('ini', 'env', 'vars'):
if section in data:
for entry in data[section]:
process(entry)
class ConfigManager:
DEPRECATED = [] # type: list[tuple[str, dict[str, str]]]
WARNINGS = set() # type: set[str]
_errors: list[tuple[str, Exception]]
def __init__(self, conf_file=None, defs_file=None):
self._base_defs = {}
self._plugins = {}
self._parsers = {}
self._config_file = conf_file
self._base_defs = self._read_config_yaml_file(defs_file or ('%s/base.yml' % os.path.dirname(__file__)))
_add_base_defs_deprecations(self._base_defs)
if self._config_file is None:
# set config using ini
self._config_file = find_ini_config_file(self.WARNINGS)
# consume configuration
if self._config_file:
# initialize parser and read config
self._parse_config_file()
self._errors = []
"""Deferred errors that will be turned into warnings."""
# ensure we always have config def entry
self._base_defs['CONFIG_FILE'] = {'default': None, 'type': 'path'}
def load_galaxy_server_defs(self, server_list):
def server_config_def(section, key, required, option_type):
config_def = {
'description': 'The %s of the %s Galaxy server' % (key, section),
'ini': [
{
'section': 'galaxy_server.%s' % section,
'key': key,
}
],
'env': [
{'name': 'ANSIBLE_GALAXY_SERVER_%s_%s' % (section.upper(), key.upper())},
],
'required': required,
'type': option_type,
}
if key in GALAXY_SERVER_ADDITIONAL:
config_def.update(GALAXY_SERVER_ADDITIONAL[key])
# ensure we always have a default timeout
if key == 'timeout' and 'default' not in config_def:
config_def['default'] = self.get_config_value('GALAXY_SERVER_TIMEOUT')
return config_def
if server_list:
for server_key in server_list:
if not server_key:
# To filter out empty strings or non truthy values as an empty server list env var is equal to [''].
continue
# Config definitions are looked up dynamically based on the C.GALAXY_SERVER_LIST entry. We look up the
# section [galaxy_server.<server>] for the values url, username, password, and token.
defs = dict((k, server_config_def(server_key, k, req, value_type)) for k, req, value_type in GALAXY_SERVER_DEF)
self.initialize_plugin_configuration_definitions('galaxy_server', server_key, defs)
def template_default(self, value, variables, key_name: str = '<unknown>'):
if isinstance(value, str) and (value.startswith('{{') and value.endswith('}}')) and variables is not None:
# template default values if possible
# NOTE: cannot use is_template due to circular dep
try:
# FIXME: This really should be using an immutable sandboxed native environment, not just native environment
template = NativeEnvironment().from_string(value)
value = template.render(variables)
except Exception as ex:
self._errors.append((f'Failed to template default for config {key_name}.', ex))
return value
def _read_config_yaml_file(self, yml_file):
# TODO: handle relative paths as relative to the directory containing the current playbook instead of CWD
# Currently this is only used with absolute paths to the `ansible/config` directory
yml_file = to_bytes(yml_file)
if os.path.exists(yml_file):
with open(yml_file, 'rb') as config_def:
return yaml_load(config_def) or {}
raise AnsibleError(
"Missing base YAML definition file (bad install?): %s" % to_native(yml_file))
def _parse_config_file(self, cfile=None):
""" return flat configuration settings from file(s) """
# TODO: take list of files with merge/nomerge
if cfile is None:
cfile = self._config_file
ftype = get_config_type(cfile)
if cfile is not None:
if ftype == 'ini':
self._parsers[cfile] = configparser.ConfigParser(inline_comment_prefixes=(';',))
with open(to_bytes(cfile), 'rb') as f:
try:
cfg_text = to_text(f.read(), errors='surrogate_or_strict')
except UnicodeError as e:
raise AnsibleOptionsError("Error reading config file(%s) because the config file was not utf8 encoded: %s" % (cfile, to_native(e)))
try:
self._parsers[cfile].read_string(cfg_text)
except configparser.Error as e:
raise AnsibleOptionsError("Error reading config file (%s): %s" % (cfile, to_native(e)))
# FIXME: this should eventually handle yaml config files
# elif ftype == 'yaml':
# with open(cfile, 'rb') as config_stream:
# self._parsers[cfile] = yaml_load(config_stream)
else:
raise AnsibleOptionsError("Unsupported configuration file type: %s" % to_native(ftype))
def _find_yaml_config_files(self):
""" Load YAML Config Files in order, check merge flags, keep origin of settings"""
pass
def get_plugin_options(self, plugin_type, name, keys=None, variables=None, direct=None):
options = {}
defs = self.get_configuration_definitions(plugin_type=plugin_type, name=name)
for option in defs:
options[option] = self.get_config_value(option, plugin_type=plugin_type, plugin_name=name, keys=keys, variables=variables, direct=direct)
return options
def get_plugin_vars(self, plugin_type, name):
pvars = []
for pdef in self.get_configuration_definitions(plugin_type=plugin_type, name=name).values():
if 'vars' in pdef and pdef['vars']:
for var_entry in pdef['vars']:
pvars.append(var_entry['name'])
return pvars
def get_plugin_options_from_var(self, plugin_type, name, variable):
options = []
for option_name, pdef in self.get_configuration_definitions(plugin_type=plugin_type, name=name).items():
if 'vars' in pdef and pdef['vars']:
for var_entry in pdef['vars']:
if variable == var_entry['name']:
options.append(option_name)
return options
def get_configuration_definition(self, name, plugin_type=None, plugin_name=None):
ret = {}
if plugin_type is None:
ret = self._base_defs.get(name, None)
elif plugin_name is None:
ret = self._plugins.get(plugin_type, {}).get(name, None)
else:
ret = self._plugins.get(plugin_type, {}).get(plugin_name, {}).get(name, None)
return ret
def has_configuration_definition(self, plugin_type, name):
has = False
if plugin_type in self._plugins:
has = (name in self._plugins[plugin_type])
return has
def get_configuration_definitions(self, plugin_type=None, name=None, ignore_private=False):
""" just list the possible settings, either base or for specific plugins or plugin """
ret = {}
if plugin_type is None:
ret = self._base_defs
elif name is None:
ret = self._plugins.get(plugin_type, {})
else:
ret = self._plugins.get(plugin_type, {}).get(name, {})
if ignore_private: # ignore 'test' config entries, they should not change runtime behaviors
for cdef in list(ret.keys()):
if cdef.startswith('_Z_'):
del ret[cdef]
return ret
def _loop_entries(self, container, entry_list):
""" repeat code for value entry assignment """
value = None
origin = None
for entry in entry_list:
name = entry.get('name')
try:
temp_value = container.get(name, None)
except UnicodeEncodeError:
self.WARNINGS.add(u'value for config entry {0} contains invalid characters, ignoring...'.format(to_text(name)))
continue
if temp_value is not None: # only set if entry is defined in container
value = temp_value
origin = name
# deal with deprecation of setting source, if used
if 'deprecated' in entry:
self.DEPRECATED.append((entry['name'], entry['deprecated']))
return value, origin
def get_config_value(self, config, cfile=None, plugin_type=None, plugin_name=None, keys=None, variables=None, direct=None):
""" wrapper """
try:
value, _drop = self.get_config_value_and_origin(config, cfile=cfile, plugin_type=plugin_type, plugin_name=plugin_name,
keys=keys, variables=variables, direct=direct)
except AnsibleError:
raise
except Exception as ex:
raise AnsibleError(f"Unhandled exception when retrieving {config!r}.") from ex
return value
def get_config_default(self, config: str, plugin_type: str | None = None, plugin_name: str | None = None) -> t.Any:
"""Return the default value for the specified configuration."""
return self.get_configuration_definitions(plugin_type, plugin_name)[config]['default']
def get_config_value_and_origin(self, config, cfile=None, plugin_type=None, plugin_name=None, keys=None, variables=None, direct=None):
""" Given a config key figure out the actual value and report on the origin of the settings """
if cfile is None:
# use default config
cfile = self._config_file
if config == 'CONFIG_FILE':
return cfile, ''
# Note: sources that are lists listed in low to high precedence (last one wins)
value = None
origin = None
origin_ftype = None
defs = self.get_configuration_definitions(plugin_type=plugin_type, name=plugin_name)
if config in defs:
aliases = defs[config].get('aliases', [])
# direct setting via plugin arguments, can set to None so we bypass rest of processing/defaults
if direct:
if config in direct:
value = direct[config]
origin = 'Direct'
else:
direct_aliases = [direct[alias] for alias in aliases if alias in direct]
if direct_aliases:
value = direct_aliases[0]
origin = 'Direct'
if value is None and variables and defs[config].get('vars'):
# Use 'variable overrides' if present, highest precedence, but only present when querying running play
value, origin = self._loop_entries(variables, defs[config]['vars'])
origin = 'var: %s' % origin
# use playbook keywords if you have em
if value is None and defs[config].get('keyword') and keys:
value, origin = self._loop_entries(keys, defs[config]['keyword'])
origin = 'keyword: %s' % origin
# automap to keywords
# TODO: deprecate these in favor of explicit keyword above
if value is None and keys:
if config in keys:
value = keys[config]
keyword = config
elif aliases:
for alias in aliases:
if alias in keys:
value = keys[alias]
keyword = alias
break
if value is not None:
origin = 'keyword: %s' % keyword
if value is None and 'cli' in defs[config]:
# avoid circular import .. until valid
from ansible import context
value, origin = self._loop_entries(context.CLIARGS, defs[config]['cli'])
origin = 'cli: %s' % origin
# env vars are next precedence
if value is None and defs[config].get('env'):
value, origin = self._loop_entries(os.environ, defs[config]['env'])
origin = 'env: %s' % origin
# try config file entries next, if we have one
if self._parsers.get(cfile, None) is None:
self._parse_config_file(cfile)
# attempt to read from config file
if value is None and cfile is not None:
ftype = get_config_type(cfile)
if ftype and defs[config].get(ftype):
try:
for entry in defs[config][ftype]:
# load from config
if ftype == 'ini':
temp_value = get_ini_config_value(self._parsers[cfile], entry)
elif ftype == 'yaml':
raise AnsibleError('YAML configuration type has not been implemented yet')
else:
raise AnsibleError('Invalid configuration file type: %s' % ftype)
if temp_value is not None:
# set value and origin
value = temp_value
origin = cfile
origin_ftype = ftype
if 'deprecated' in entry:
if ftype == 'ini':
self.DEPRECATED.append(('[%s]%s' % (entry['section'], entry['key']), entry['deprecated']))
else:
raise AnsibleError('Unimplemented file type: %s' % ftype)
except Exception as e:
sys.stderr.write("Error while loading config %s: %s" % (cfile, to_native(e)))
# set default if we got here w/o a value
if value is None:
if defs[config].get('required', False):
if not plugin_type or config not in INTERNAL_DEFS.get(plugin_type, {}):
raise AnsibleRequiredOptionError(f"Required config {_get_config_label(plugin_type, plugin_name, config)} not provided.")
else:
origin = 'default'
value = self.template_default(defs[config].get('default'), variables, key_name=_get_config_label(plugin_type, plugin_name, config))
try:
# ensure correct type, can raise exceptions on mismatched types
value = ensure_type(value, defs[config].get('type'), origin=origin, origin_ftype=origin_ftype)
except ValueError as ex:
if origin.startswith('env:') and value == '':
# this is empty env var for non string so we can set to default
origin = 'default'
value = ensure_type(defs[config].get('default'), defs[config].get('type'), origin=origin, origin_ftype=origin_ftype)
else:
raise AnsibleOptionsError(f'Config {_get_config_label(plugin_type, plugin_name, config)} from {origin!r} has an invalid value.') from ex
# deal with restricted values
if value is not None and 'choices' in defs[config] and defs[config]['choices'] is not None:
invalid_choices = True # assume the worst!
if defs[config].get('type') == 'list':
# for a list type, compare all values in type are allowed
invalid_choices = not all(choice in defs[config]['choices'] for choice in value)
else:
# these should be only the simple data types (string, int, bool, float, etc) .. ignore dicts for now
invalid_choices = value not in defs[config]['choices']
if invalid_choices:
if isinstance(defs[config]['choices'], Mapping):
valid = ', '.join([to_text(k) for k in defs[config]['choices'].keys()])
elif isinstance(defs[config]['choices'], str):
valid = defs[config]['choices']
elif isinstance(defs[config]['choices'], Sequence):
valid = ', '.join([to_text(c) for c in defs[config]['choices']])
else:
valid = defs[config]['choices']
raise AnsibleOptionsError(f'Invalid value {value!r} for config {_get_config_label(plugin_type, plugin_name, config)}.',
help_text=f'Valid values are: {valid}')
# deal with deprecation of the setting
if 'deprecated' in defs[config] and origin != 'default':
self.DEPRECATED.append((config, defs[config].get('deprecated')))
else:
raise AnsibleUndefinedConfigEntry(f'No config definition exists for {_get_config_label(plugin_type, plugin_name, config)}.')
if not _tags.Origin.is_tagged_on(value):
value = _tags.Origin(description=f'<Config {origin}>').tag(value)
return value, origin
def initialize_plugin_configuration_definitions(self, plugin_type, name, defs):
if plugin_type not in self._plugins:
self._plugins[plugin_type] = {}
self._plugins[plugin_type][name] = defs
@staticmethod
def get_deprecated_msg_from_config(dep_docs, include_removal=False, collection_name=None):
removal = ''
if include_removal:
if 'removed_at_date' in dep_docs:
removal = f"Will be removed in a release after {dep_docs['removed_at_date']}\n\t"
elif collection_name:
removal = f"Will be removed in: {collection_name} {dep_docs['removed_in']}\n\t"
else:
removal = f"Will be removed in: Ansible {dep_docs['removed_in']}\n\t"
# TODO: choose to deprecate either singular or plural
alt = dep_docs.get('alternatives', dep_docs.get('alternative', 'none'))
return f"Reason: {dep_docs['why']}\n\t{removal}Alternatives: {alt}"
|