1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
# Copyright (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
import os
import re
from collections.abc import MutableMapping, MutableSequence
from ansible import constants as C
from ansible.errors import AnsibleError
from ansible.module_utils import six
from ansible.plugins.loader import connection_loader
from ansible.utils.display import Display
display = Display()
def module_response_deepcopy(v):
"""Function to create a deep copy of module response data
Designed to be used within the Ansible "engine" to improve performance
issues where ``copy.deepcopy`` was used previously, largely with CPU
and memory contention.
This only supports the following data types, and was designed to only
handle specific workloads:
* ``dict``
* ``list``
The data we pass here will come from a serialization such
as JSON, so we shouldn't have need for other data types such as
``set`` or ``tuple``.
Take note that this function should not be used extensively as a
replacement for ``deepcopy`` due to the naive way in which this
handles other data types.
Do not expect uses outside of those listed below to maintain
backwards compatibility, in case we need to extend this function
to handle our specific needs:
* ``ansible.executor.task_result._RawTaskResult.as_callback_task_result``
* ``ansible.vars.clean.clean_facts``
* ``ansible.vars.namespace_facts``
"""
if isinstance(v, dict):
ret = v.copy()
items = six.iteritems(ret)
elif isinstance(v, list):
ret = v[:]
items = enumerate(ret)
else:
return v
for key, value in items:
if isinstance(value, (dict, list)):
ret[key] = module_response_deepcopy(value)
else:
ret[key] = value
return ret
def strip_internal_keys(dirty, exceptions=None):
# All keys starting with _ansible_ are internal, so change the 'dirty' mapping and remove them.
if exceptions is None:
exceptions = tuple()
if isinstance(dirty, MutableSequence):
for element in dirty:
if isinstance(element, (MutableMapping, MutableSequence)):
strip_internal_keys(element, exceptions=exceptions)
elif isinstance(dirty, MutableMapping):
# listify to avoid updating dict while iterating over it
for k in list(dirty.keys()):
if isinstance(k, six.string_types):
if k.startswith('_ansible_') and k not in exceptions:
del dirty[k]
continue
if isinstance(dirty[k], (MutableMapping, MutableSequence)):
strip_internal_keys(dirty[k], exceptions=exceptions)
else:
raise AnsibleError("Cannot strip invalid keys from %s" % type(dirty))
return dirty
def remove_internal_keys(data):
"""
More nuanced version of strip_internal_keys
"""
for key in list(data.keys()):
if (key.startswith('_ansible_') and key != '_ansible_parsed') or key in C.INTERNAL_RESULT_KEYS:
display.warning("Removed unexpected internal key in module return: %s = %s" % (key, data[key]))
del data[key]
# remove bad/empty internal keys
for key in ['warnings', 'deprecations']:
if key in data and not data[key]:
del data[key]
# cleanse fact values that are allowed from actions but not modules
for key in list(data.get('ansible_facts', {}).keys()):
if key.startswith('discovered_interpreter_') or key.startswith('ansible_discovered_interpreter_'):
del data['ansible_facts'][key]
def clean_facts(facts):
""" remove facts that can override internal keys or otherwise deemed unsafe """
data = module_response_deepcopy(facts)
remove_keys = set()
fact_keys = set(data.keys())
# first we add all of our magic variable names to the set of
# keys we want to remove from facts
# NOTE: these will eventually disappear in favor of others below
for magic_var in C.MAGIC_VARIABLE_MAPPING:
remove_keys.update(fact_keys.intersection(C.MAGIC_VARIABLE_MAPPING[magic_var]))
# remove common connection vars
remove_keys.update(fact_keys.intersection(C.COMMON_CONNECTION_VARS))
# next we remove any connection plugin specific vars
for conn_path in connection_loader.all(path_only=True):
conn_name = os.path.splitext(os.path.basename(conn_path))[0]
re_key = re.compile('^ansible_%s_' % re.escape(conn_name))
for fact_key in fact_keys:
# most lightweight VM or container tech creates devices with this pattern, this avoids filtering them out
if (re_key.match(fact_key) and not fact_key.endswith(('_bridge', '_gwbridge'))) or fact_key.startswith('ansible_become_'):
remove_keys.add(fact_key)
# remove some KNOWN keys
for hard in C.RESTRICTED_RESULT_KEYS + C.INTERNAL_RESULT_KEYS:
if hard in fact_keys:
remove_keys.add(hard)
# finally, we search for interpreter keys to remove
re_interp = re.compile('^ansible_.*_interpreter$')
for fact_key in fact_keys:
if re_interp.match(fact_key):
remove_keys.add(fact_key)
# then we remove them (except for ssh host keys)
for r_key in remove_keys:
if not r_key.startswith('ansible_ssh_host_key_'):
display.warning("Removed restricted key from module data: %s" % (r_key))
del data[r_key]
return strip_internal_keys(data)
def namespace_facts(facts):
""" return all facts inside 'ansible_facts' w/o an ansible_ prefix """
deprefixed = {}
for k in facts:
if k.startswith('ansible_') and k not in ('ansible_local',):
deprefixed[k[8:]] = module_response_deepcopy(facts[k])
else:
deprefixed[k] = module_response_deepcopy(facts[k])
return {'ansible_facts': deprefixed}
|