1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
|
# Copyright: (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com>
# Copyright: (c) 2017, Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import decimal
import itertools
import operator
import os
import typing as t
from copy import copy as shallowcopy
from functools import cache
from ansible import constants as C
from ansible import context
from ansible.errors import AnsibleError, AnsibleParserError, AnsibleAssertionError, AnsibleValueOmittedError, AnsibleFieldAttributeError
from ansible.module_utils.datatag import native_type_name
from ansible._internal._datatag._tags import Origin
from ansible.module_utils.six import string_types
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.common.sentinel import Sentinel
from ansible.module_utils.common.text.converters import to_text
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.attribute import Attribute, FieldAttribute, ConnectionFieldAttribute, NonInheritableFieldAttribute
from ansible.plugins.loader import module_loader, action_loader
from ansible.utils.collection_loader._collection_finder import _get_collection_metadata, AnsibleCollectionRef
from ansible.utils.display import Display
from ansible.utils.vars import combine_vars, get_unique_id, validate_variable_name
from ansible._internal._templating._engine import TemplateEngine
display = Display()
def _validate_action_group_metadata(action, found_group_metadata, fq_group_name):
valid_metadata = {
'extend_group': {
'types': (list, string_types,),
'errortype': 'list',
},
}
metadata_warnings = []
validate = C.VALIDATE_ACTION_GROUP_METADATA
metadata_only = isinstance(action, dict) and 'metadata' in action and len(action) == 1
if validate and not metadata_only:
found_keys = ', '.join(sorted(list(action)))
metadata_warnings.append("The only expected key is metadata, but got keys: {keys}".format(keys=found_keys))
elif validate:
if found_group_metadata:
metadata_warnings.append("The group contains multiple metadata entries.")
if not isinstance(action['metadata'], dict):
metadata_warnings.append("The metadata is not a dictionary. Got {metadata}".format(metadata=action['metadata']))
else:
unexpected_keys = set(action['metadata'].keys()) - set(valid_metadata.keys())
if unexpected_keys:
metadata_warnings.append("The metadata contains unexpected keys: {0}".format(', '.join(unexpected_keys)))
unexpected_types = []
for field, requirement in valid_metadata.items():
if field not in action['metadata']:
continue
value = action['metadata'][field]
if not isinstance(value, requirement['types']):
unexpected_types.append("%s is %s (expected type %s)" % (field, value, requirement['errortype']))
if unexpected_types:
metadata_warnings.append("The metadata contains unexpected key types: {0}".format(', '.join(unexpected_types)))
if metadata_warnings:
metadata_warnings.insert(0, "Invalid metadata was found for action_group {0} while loading module_defaults.".format(fq_group_name))
display.warning(" ".join(metadata_warnings))
class _ClassProperty:
def __set_name__(self, owner, name):
self.name = name
def __get__(self, obj, objtype=None):
return getattr(objtype, f'_{self.name}')()
class FieldAttributeBase:
_post_validate_object = False
"""
`False` skips FieldAttribute post-validation on intermediate objects and mixins for attributes without `always_post_validate`.
Leaf objects (e.g., `Task`) should set this attribute `True` to opt-in to post-validation.
"""
fattributes = _ClassProperty()
@classmethod
@cache
def _fattributes(cls):
fattributes = {}
for class_obj in reversed(cls.__mro__):
for name, attr in list(class_obj.__dict__.items()):
if not isinstance(attr, Attribute):
continue
fattributes[name] = attr
if attr.alias:
setattr(class_obj, attr.alias, attr)
fattributes[attr.alias] = attr
return fattributes
def __init__(self) -> None:
# initialize the data loader and variable manager, which will be provided
# later when the object is actually loaded
self._loader = None
self._variable_manager = None
self._origin: Origin | None = None
# other internal params
self._validated = False
self._squashed = False
self._finalized = False
# every object gets a random uuid:
self._uuid = get_unique_id()
@property
def finalized(self):
return self._finalized
def dump_me(self, depth=0):
""" this is never called from production code, it is here to be used when debugging as a 'complex print' """
if depth == 0:
display.debug("DUMPING OBJECT ------------------------------------------------------")
display.debug("%s- %s (%s, id=%s)" % (" " * depth, self.__class__.__name__, self, id(self)))
if hasattr(self, '_parent') and self._parent:
self._parent.dump_me(depth + 2)
dep_chain = self._parent.get_dep_chain()
if dep_chain:
for dep in dep_chain:
dep.dump_me(depth + 2)
if hasattr(self, '_play') and self._play:
self._play.dump_me(depth + 2)
def preprocess_data(self, ds):
""" infrequently used method to do some pre-processing of legacy terms """
return ds
def load_data(self, ds, variable_manager=None, loader=None):
""" walk the input datastructure and assign any values """
if ds is None:
raise AnsibleAssertionError('ds (%s) should not be None but it is.' % ds)
# cache the datastructure internally
setattr(self, '_ds', ds)
# the variable manager class is used to manage and merge variables
# down to a single dictionary for reference in templating, etc.
self._variable_manager = variable_manager
self._origin = Origin.get_tag(ds)
# the data loader class is used to parse data from strings and files
if loader is not None:
self._loader = loader
else:
self._loader = DataLoader()
# call the preprocess_data() function to massage the data into
# something we can more easily parse, and then call the validation
# function on it to ensure there are no incorrect key values
ds = self.preprocess_data(ds)
self._validate_attributes(ds)
# Walk all attributes in the class. We sort them based on their priority
# so that certain fields can be loaded before others, if they are dependent.
for name, attr in sorted(self.fattributes.items(), key=operator.itemgetter(1)):
# copy the value over unless a _load_field method is defined
if name in ds:
method = getattr(self, '_load_%s' % name, None)
if method:
setattr(self, name, method(name, ds[name]))
else:
setattr(self, name, ds[name])
# run early, non-critical validation
self.validate()
# return the constructed object
return self
def get_ds(self):
try:
return getattr(self, '_ds')
except AttributeError:
return None
def get_loader(self):
return self._loader
def get_variable_manager(self):
return self._variable_manager
def _post_validate_debugger(self, attr, value, templar):
try:
value = templar.template(value)
except AnsibleValueOmittedError:
value = self.set_to_context(attr.name)
valid_values = frozenset(('always', 'on_failed', 'on_unreachable', 'on_skipped', 'never'))
if value and isinstance(value, string_types) and value not in valid_values:
raise AnsibleParserError("'%s' is not a valid value for debugger. Must be one of %s" % (value, ', '.join(valid_values)), obj=self.get_ds())
return value
def _validate_attributes(self, ds):
"""
Ensures that there are no keys in the datastructure which do
not map to attributes for this object.
"""
valid_attrs = frozenset(self.fattributes)
for key in ds:
if key not in valid_attrs:
raise AnsibleParserError("'%s' is not a valid attribute for a %s" % (key, self.__class__.__name__), obj=key)
def validate(self, all_vars=None):
""" validation that is done at parse time, not load time """
all_vars = {} if all_vars is None else all_vars
if not self._validated:
# walk all fields in the object
for (name, attribute) in self.fattributes.items():
# run validator only if present
method = getattr(self, '_validate_%s' % name, None)
if method:
method(attribute, name, getattr(self, name))
else:
# and make sure the attribute is of the type it should be
value = getattr(self, f'_{name}', Sentinel)
if value is not None:
if attribute.isa == 'string' and isinstance(value, (list, dict)):
raise AnsibleParserError(
"The field '%s' is supposed to be a string type,"
" however the incoming data structure is a %s" % (name, type(value)), obj=self.get_ds()
)
self._validated = True
def _load_module_defaults(self, name, value):
if value is None:
return
if not isinstance(value, list):
value = [value]
validated_module_defaults = []
for defaults_dict in value:
if not isinstance(defaults_dict, dict):
raise AnsibleParserError(
"The field 'module_defaults' is supposed to be a dictionary or list of dictionaries, "
"the keys of which must be static action, module, or group names. Only the values may contain "
"templates. For example: {'ping': \"{{ ping_defaults }}\"}",
obj=defaults_dict,
)
validated_defaults_dict = {}
for defaults_entry, defaults in defaults_dict.items():
# module_defaults do not use the 'collections' keyword, so actions and
# action_groups that are not fully qualified are part of the 'ansible.legacy'
# collection. Update those entries here, so module_defaults contains
# fully qualified entries.
if defaults_entry.startswith('group/'):
group_name = defaults_entry.split('group/')[-1]
# The resolved action_groups cache is associated saved on the current Play
if self.play is not None:
group_name, dummy = self._resolve_group(group_name)
defaults_entry = 'group/' + group_name
validated_defaults_dict[defaults_entry] = defaults
else:
if len(defaults_entry.split('.')) < 3:
defaults_entry = 'ansible.legacy.' + defaults_entry
resolved_action = self._resolve_action(defaults_entry)
if resolved_action:
validated_defaults_dict[resolved_action] = defaults
# If the defaults_entry is an ansible.legacy plugin, these defaults
# are inheritable by the 'ansible.builtin' subset, but are not
# required to exist.
if defaults_entry.startswith('ansible.legacy.'):
resolved_action = self._resolve_action(
defaults_entry.replace('ansible.legacy.', 'ansible.builtin.'),
mandatory=False
)
if resolved_action:
validated_defaults_dict[resolved_action] = defaults
validated_module_defaults.append(validated_defaults_dict)
return validated_module_defaults
@property
def play(self):
if hasattr(self, '_play'):
play = self._play
elif hasattr(self, '_parent') and hasattr(self._parent, '_play'):
play = self._parent._play
else:
play = self
if play.__class__.__name__ != 'Play':
# Should never happen, but handle gracefully by returning None, just in case
return None
return play
def _resolve_group(self, fq_group_name, mandatory=True):
if not AnsibleCollectionRef.is_valid_fqcr(fq_group_name):
collection_name = 'ansible.builtin'
fq_group_name = collection_name + '.' + fq_group_name
else:
collection_name = '.'.join(fq_group_name.split('.')[0:2])
# Check if the group has already been resolved and cached
if fq_group_name in self.play._group_actions:
return fq_group_name, self.play._group_actions[fq_group_name]
try:
action_groups = _get_collection_metadata(collection_name).get('action_groups', {})
except ValueError:
if not mandatory:
display.vvvvv("Error loading module_defaults: could not resolve the module_defaults group %s" % fq_group_name)
return fq_group_name, []
raise AnsibleParserError("Error loading module_defaults: could not resolve the module_defaults group %s" % fq_group_name)
# The collection may or may not use the fully qualified name
# Don't fail if the group doesn't exist in the collection
resource_name = fq_group_name.split(collection_name + '.')[-1]
action_group = action_groups.get(
fq_group_name,
action_groups.get(resource_name)
)
if action_group is None:
if not mandatory:
display.vvvvv("Error loading module_defaults: could not resolve the module_defaults group %s" % fq_group_name)
return fq_group_name, []
raise AnsibleParserError("Error loading module_defaults: could not resolve the module_defaults group %s" % fq_group_name)
resolved_actions = []
include_groups = []
found_group_metadata = False
for action in action_group:
# Everything should be a string except the metadata entry
if not isinstance(action, string_types):
_validate_action_group_metadata(action, found_group_metadata, fq_group_name)
if isinstance(action['metadata'], dict):
found_group_metadata = True
include_groups = action['metadata'].get('extend_group', [])
if isinstance(include_groups, string_types):
include_groups = [include_groups]
if not isinstance(include_groups, list):
# Bad entries may be a warning above, but prevent tracebacks by setting it back to the acceptable type.
include_groups = []
continue
# The collection may or may not use the fully qualified name.
# If not, it's part of the current collection.
if not AnsibleCollectionRef.is_valid_fqcr(action):
action = collection_name + '.' + action
resolved_action = self._resolve_action(action, mandatory=False)
if resolved_action:
resolved_actions.append(resolved_action)
for action in resolved_actions:
if action not in self.play._action_groups:
self.play._action_groups[action] = []
self.play._action_groups[action].append(fq_group_name)
self.play._group_actions[fq_group_name] = resolved_actions
# Resolve extended groups last, after caching the group in case they recursively refer to each other
for include_group in include_groups:
if not AnsibleCollectionRef.is_valid_fqcr(include_group):
include_group = collection_name + '.' + include_group
dummy, group_actions = self._resolve_group(include_group, mandatory=False)
for action in group_actions:
if action not in self.play._action_groups:
self.play._action_groups[action] = []
self.play._action_groups[action].append(fq_group_name)
self.play._group_actions[fq_group_name].extend(group_actions)
resolved_actions.extend(group_actions)
return fq_group_name, resolved_actions
def _resolve_action(self, action_name, mandatory=True):
context = module_loader.find_plugin_with_context(action_name, ignore_deprecated=(not mandatory))
if context.resolved and not context.action_plugin:
prefer = action_loader.find_plugin_with_context(action_name, ignore_deprecated=(not mandatory))
if prefer.resolved:
context = prefer
elif not context.resolved:
context = action_loader.find_plugin_with_context(action_name, ignore_deprecated=(not mandatory))
if context.resolved:
return context.resolved_fqcn
if mandatory:
raise AnsibleParserError("Could not resolve action %s in module_defaults" % action_name)
display.vvvvv("Could not resolve action %s in module_defaults" % action_name)
def squash(self):
"""
Evaluates all attributes and sets them to the evaluated version,
so that all future accesses of attributes do not need to evaluate
parent attributes.
"""
if not self._squashed:
for name in self.fattributes:
setattr(self, name, getattr(self, name))
self._squashed = True
def copy(self):
"""
Create a copy of this object and return it.
"""
try:
new_me = self.__class__()
except RecursionError as ex:
raise AnsibleError("Exceeded maximum object depth. This may have been caused by excessive role recursion.") from ex
for name in self.fattributes:
setattr(new_me, name, shallowcopy(getattr(self, f'_{name}', Sentinel)))
new_me._loader = self._loader
new_me._variable_manager = self._variable_manager
new_me._origin = self._origin
new_me._validated = self._validated
new_me._finalized = self._finalized
new_me._uuid = self._uuid
# if the ds value was set on the object, copy it to the new copy too
if hasattr(self, '_ds'):
new_me._ds = self._ds
return new_me
def get_validated_value(self, name, attribute, value, templar):
try:
return self._get_validated_value(name, attribute, value, templar)
except (TypeError, ValueError):
raise AnsibleError(f"The value {value!r} could not be converted to {attribute.isa!r}.", obj=value)
def _get_validated_value(self, name, attribute, value, templar):
if attribute.isa == 'string':
value = to_text(value)
elif attribute.isa == 'int':
if not isinstance(value, int):
try:
if (decimal_value := decimal.Decimal(value)) != (int_value := int(decimal_value)):
raise decimal.DecimalException(f'Floating-point value {value!r} would be truncated.')
value = int_value
except decimal.DecimalException as e:
raise ValueError from e
elif attribute.isa == 'float':
value = float(value)
elif attribute.isa == 'bool':
value = boolean(value, strict=True)
elif attribute.isa == 'percent':
# special value, which may be an integer or float
# with an optional '%' at the end
if isinstance(value, string_types) and '%' in value:
value = value.replace('%', '')
value = float(value)
elif attribute.isa == 'list':
if value is None:
value = []
elif not isinstance(value, list):
value = [value]
if attribute.listof is not None:
for item in value:
if not isinstance(item, attribute.listof):
type_names = ' or '.join(f'{native_type_name(attribute_type)!r}' for attribute_type in attribute.listof)
raise AnsibleParserError(
message=f"Keyword {name!r} items must be of type {type_names}, not {native_type_name(item)!r}.",
obj=Origin.first_tagged_on(item, value, self.get_ds()),
)
elif attribute.required and attribute.listof == (str,):
if item is None or item.strip() == "":
raise AnsibleParserError(
message=f"Keyword {name!r} is required, and cannot have empty values.",
obj=Origin.first_tagged_on(item, value, self.get_ds()),
)
elif attribute.isa == 'dict':
if value is None:
value = dict()
elif not isinstance(value, dict):
raise AnsibleError(f"{value!r} is not a dictionary")
elif attribute.isa == 'class':
if not isinstance(value, attribute.class_type):
raise TypeError("%s is not a valid %s (got a %s instead)" % (name, attribute.class_type, type(value)))
value.post_validate(templar=templar)
else:
raise AnsibleAssertionError(f"Unknown value for attribute.isa: {attribute.isa}")
return value
def set_to_context(self, name: str) -> t.Any:
""" set to parent inherited value or Sentinel as appropriate"""
attribute = self.fattributes[name]
if isinstance(attribute, NonInheritableFieldAttribute):
# setting to sentinel will trigger 'default/default()' on getter
value = Sentinel
else:
try:
value = self._get_parent_attribute(name, omit=True)
except AttributeError:
# mostly playcontext as only tasks/handlers/blocks really resolve parent
value = Sentinel
setattr(self, name, value)
return value
def post_validate(self, templar):
"""
we can't tell that everything is of the right type until we have
all the variables. Run basic types (from isa) as well as
any _post_validate_<foo> functions.
"""
for name in self.fattributes:
value = self.post_validate_attribute(name, templar=templar)
if value is not Sentinel:
# and assign the massaged value back to the attribute field
setattr(self, name, value)
self._finalized = True
def post_validate_attribute(self, name: str, *, templar: TemplateEngine):
attribute: FieldAttribute = self.fattributes[name]
# DTFIX-FUTURE: this can probably be used in many getattr cases below, but the value may be out-of-date in some cases
original_value = getattr(self, name) # we save this original (likely Origin-tagged) value to pass as `obj` for errors
if attribute.static:
value = getattr(self, name)
# we don't template 'vars' but allow template as values for later use
if name not in ('vars',) and templar.is_template(value):
display.warning('"%s" is not templatable, but we found: %s, '
'it will not be templated and will be used "as is".' % (name, value))
return Sentinel
if getattr(self, name) is None:
if not attribute.required:
return Sentinel
raise AnsibleFieldAttributeError(f'The field {name!r} is required but was not set.', obj=self.get_ds())
from .role_include import IncludeRole
if not attribute.always_post_validate and isinstance(self, IncludeRole) and self.statically_loaded: # import_role
# normal field attributes should not go through post validation on import_role/import_tasks
# only import_role is checked here because import_tasks never reaches this point
return Sentinel
# Skip post validation unless always_post_validate is True, or the object requires post validation.
if not attribute.always_post_validate and not self._post_validate_object:
# Intermediate objects like Play() won't have their fields validated by
# default, as their values are often inherited by other objects and validated
# later, so we don't want them to fail out early
return Sentinel
try:
# Run the post-validator if present. These methods are responsible for
# using the given templar to template the values, if required.
method = getattr(self, '_post_validate_%s' % name, None)
if method:
value = method(attribute, getattr(self, name), templar)
elif attribute.isa == 'class':
value = getattr(self, name)
else:
try:
# if the attribute contains a variable, template it now
value = templar.template(getattr(self, name))
except AnsibleValueOmittedError:
# If this evaluated to the omit value, set the value back to inherited by context
# or default specified in the FieldAttribute and move on
value = self.set_to_context(name)
if value is Sentinel:
return value
# and make sure the attribute is of the type it should be
if value is not None:
value = self.get_validated_value(name, attribute, value, templar)
# returning the value results in assigning the massaged value back to the attribute field
return value
except Exception as ex:
if name == 'args':
raise # no useful information to contribute, raise the original exception
raise AnsibleFieldAttributeError(f'Error processing keyword {name!r}.', obj=original_value) from ex
def _load_vars(self, attr, ds):
"""
Vars in a play must be specified as a dictionary.
"""
try:
if isinstance(ds, dict):
for key in ds:
validate_variable_name(key)
return combine_vars(self.vars, ds)
elif ds is None:
return {}
else:
raise ValueError
except ValueError as ex:
raise AnsibleParserError(f"Vars in a {self.__class__.__name__} must be specified as a dictionary.", obj=ds) from ex
except TypeError as ex:
raise AnsibleParserError(f"Invalid variable name in vars specified for {self.__class__.__name__}.", obj=ds) from ex
def _extend_value(self, value, new_value, prepend=False):
"""
Will extend the value given with new_value (and will turn both
into lists if they are not so already). The values are run through
a set to remove duplicate values.
"""
if not isinstance(value, list):
value = [value]
if not isinstance(new_value, list):
new_value = [new_value]
# Due to where _extend_value may run for some attributes
# it is possible to end up with Sentinel in the list of values
# ensure we strip them
value = [v for v in value if v is not Sentinel]
new_value = [v for v in new_value if v is not Sentinel]
if prepend:
combined = new_value + value
else:
combined = value + new_value
return [i for i, dummy in itertools.groupby(combined) if i is not None]
def dump_attrs(self):
"""
Dumps all attributes to a dictionary
"""
attrs = {}
for (name, attribute) in self.fattributes.items():
attr = getattr(self, name)
if attribute.isa == 'class' and hasattr(attr, 'serialize'):
attrs[name] = attr.serialize()
else:
attrs[name] = attr
return attrs
def from_attrs(self, attrs):
"""
Loads attributes from a dictionary
"""
for (attr, value) in attrs.items():
if attr in self.fattributes:
attribute = self.fattributes[attr]
if attribute.isa == 'class' and isinstance(value, dict):
obj = attribute.class_type()
obj.deserialize(value)
setattr(self, attr, obj)
else:
setattr(self, attr, value)
else:
setattr(self, attr, value) # overridden dump_attrs in derived types may dump attributes which are not field attributes
# from_attrs is only used to create a finalized task
# from attrs from the Worker/TaskExecutor
# Those attrs are finalized and squashed in the TE
# and controller side use needs to reflect that
self._finalized = True
self._squashed = True
def serialize(self):
"""
Serializes the object derived from the base object into
a dictionary of values. This only serializes the field
attributes for the object, so this may need to be overridden
for any classes which wish to add additional items not stored
as field attributes.
"""
repr = self.dump_attrs()
# serialize the uuid field
repr['uuid'] = self._uuid
repr['finalized'] = self._finalized
repr['squashed'] = self._squashed
return repr
def deserialize(self, data):
"""
Given a dictionary of values, load up the field attributes for
this object. As with serialize(), if there are any non-field
attribute data members, this method will need to be overridden
and extended.
"""
if not isinstance(data, dict):
raise AnsibleAssertionError('data (%s) should be a dict but is a %s' % (data, type(data)))
for (name, attribute) in self.fattributes.items():
if name in data:
setattr(self, name, data[name])
else:
self.set_to_context(name)
# restore the UUID field
setattr(self, '_uuid', data.get('uuid'))
self._finalized = data.get('finalized', False)
self._squashed = data.get('squashed', False)
class Base(FieldAttributeBase):
name = NonInheritableFieldAttribute(isa='string', default='', always_post_validate=True)
# connection/transport
connection = ConnectionFieldAttribute(isa='string', default=context.cliargs_deferred_get('connection'))
port = FieldAttribute(isa='int')
remote_user = FieldAttribute(isa='string', default=context.cliargs_deferred_get('remote_user'))
# variables
vars = NonInheritableFieldAttribute(isa='dict', priority=100, static=True, default=dict)
# module default params
module_defaults = FieldAttribute(isa='list', extend=True, prepend=True)
# flags and misc. settings
environment = FieldAttribute(isa='list', extend=True, prepend=True)
no_log = FieldAttribute(isa='bool', default=C.DEFAULT_NO_LOG)
run_once = FieldAttribute(isa='bool')
ignore_errors = FieldAttribute(isa='bool')
ignore_unreachable = FieldAttribute(isa='bool')
check_mode = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('check'))
diff = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('diff'))
any_errors_fatal = FieldAttribute(isa='bool', default=C.ANY_ERRORS_FATAL)
throttle = FieldAttribute(isa='int', default=0)
timeout = FieldAttribute(isa='int', default=C.TASK_TIMEOUT)
# explicitly invoke a debugger on tasks
debugger = FieldAttribute(isa='string')
# Privilege escalation
become = FieldAttribute(isa='bool', default=context.cliargs_deferred_get('become'))
become_method = FieldAttribute(isa='string', default=context.cliargs_deferred_get('become_method'))
become_user = FieldAttribute(isa='string', default=context.cliargs_deferred_get('become_user'))
become_flags = FieldAttribute(isa='string', default=context.cliargs_deferred_get('become_flags'))
become_exe = FieldAttribute(isa='string', default=context.cliargs_deferred_get('become_exe'))
# used to hold sudo/su stuff
DEPRECATED_ATTRIBUTES = [] # type: list[str]
def update_result_no_log(self, templar: TemplateEngine, result: dict[str, t.Any]) -> None:
"""Set the post-validated no_log value for the result, falling back to a default on validation/templating failure with a warning."""
if self.finalized:
no_log = self.no_log
else:
try:
no_log = self.post_validate_attribute('no_log', templar=templar)
except Exception as ex:
display.error_as_warning('Invalid no_log value for task, output will be masked.', exception=ex)
no_log = True
result_no_log = result.get('_ansible_no_log', False)
if not isinstance(result_no_log, bool):
display.warning(f'Invalid _ansible_no_log value of type {type(result_no_log).__name__!r} in task result, output will be masked.')
no_log = True
no_log = no_log or result_no_log
result.update(_ansible_no_log=no_log)
def get_path(self) -> str:
""" return the absolute path of the playbook object and its line number """
origin = self._origin
if not origin:
try:
origin = self._parent._play._origin
except AttributeError:
pass
if origin and origin.path:
path = f"{origin.path}:{origin.line_num or 1}"
else:
path = ""
return path
def get_dep_chain(self):
if hasattr(self, '_parent') and self._parent:
return self._parent.get_dep_chain()
else:
return None
def get_search_path(self):
"""
Return the list of paths you should search for files, in order.
This follows role/playbook dependency chain.
"""
path_stack = []
dep_chain = self.get_dep_chain()
# inside role: add the dependency chain from current to dependent
if dep_chain:
path_stack.extend(reversed([x._role_path for x in dep_chain if hasattr(x, '_role_path')]))
# add path of task itself, unless it is already in the list
task_dir = os.path.dirname(self.get_path())
if task_dir not in path_stack:
path_stack.append(task_dir)
return path_stack
|