1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
# Copyright (C) 2012 Yahoo! Inc.
#
# Author: Joshua Harlow <harlowja@yahoo-inc.com>
#
# This file is part of cloud-init. See LICENSE file for license information.
import re
from cloudinit import importer, type_utils
DEF_MERGE_TYPE = "list()+dict()+str()"
MERGER_PREFIX = "m_"
MERGER_ATTR = "Merger"
class UnknownMerger:
# Named differently so auto-method finding
# doesn't pick this up if there is ever a type
# named "unknown"
def _handle_unknown(self, _meth_wanted, value, _merge_with):
return value
# This merging will attempt to look for a '_on_X' method
# in our own object for a given object Y with type X,
# if found it will be called to perform the merge of a source
# object and a object to merge_with.
#
# If not found the merge will be given to a '_handle_unknown'
# function which can decide what to do wit the 2 values.
def merge(self, source, merge_with):
type_name = type_utils.obj_name(source)
type_name = type_name.lower()
method_name = "_on_%s" % (type_name)
meth = None
args = [source, merge_with]
if hasattr(self, method_name):
meth = getattr(self, method_name)
if not meth:
meth = self._handle_unknown
args.insert(0, method_name)
return meth(*args)
class LookupMerger(UnknownMerger):
def __init__(self, lookups=None):
UnknownMerger.__init__(self)
if lookups is None:
self._lookups = []
else:
self._lookups = lookups
def __str__(self):
return "LookupMerger: (%s)" % (len(self._lookups))
# For items which can not be merged by the parent this object
# will lookup in a internally maintained set of objects and
# find which one of those objects can perform the merge. If
# any of the contained objects have the needed method, they
# will be called to perform the merge.
def _handle_unknown(self, meth_wanted, value, merge_with):
meth = None
for merger in self._lookups:
if hasattr(merger, meth_wanted):
# First one that has that method/attr gets to be
# the one that will be called
meth = getattr(merger, meth_wanted)
break
if not meth:
return UnknownMerger._handle_unknown(
self, meth_wanted, value, merge_with
)
return meth(value, merge_with)
def dict_extract_mergers(config):
parsed_mergers: list = []
raw_mergers = config.pop("merge_how", None)
if raw_mergers is None:
raw_mergers = config.pop("merge_type", None)
if raw_mergers is None:
return parsed_mergers
if isinstance(raw_mergers, str):
return string_extract_mergers(raw_mergers)
for m in raw_mergers:
if isinstance(m, (dict)):
name = m["name"]
name = name.replace("-", "_").strip()
opts = m["settings"]
else:
name = m[0]
if len(m) >= 2:
opts = m[1:]
else:
opts = []
if name:
parsed_mergers.append((name, opts))
return parsed_mergers
def string_extract_mergers(merge_how):
parsed_mergers = []
for m_name in merge_how.split("+"):
# Canonicalize the name (so that it can be found
# even when users alter it in various ways)
m_name = m_name.lower().strip()
m_name = m_name.replace("-", "_")
if not m_name:
continue
match = re.match(r"(^[a-zA-Z_][A-Za-z0-9_]*)\((.*?)\)$", m_name)
if not match:
msg = "Matcher identifier '%s' is not in the right format" % (
m_name
)
raise ValueError(msg)
(m_name, m_ops) = match.groups()
m_ops = m_ops.strip().split(",")
m_ops = [m.strip().lower() for m in m_ops if m.strip()]
parsed_mergers.append((m_name, m_ops))
return parsed_mergers
def default_mergers():
return tuple(string_extract_mergers(DEF_MERGE_TYPE))
def construct(parsed_mergers):
mergers_to_be = []
for m_name, m_ops in parsed_mergers:
if not m_name.startswith(MERGER_PREFIX):
m_name = MERGER_PREFIX + str(m_name)
merger_locs, looked_locs = importer.find_module(
m_name, [__name__], [MERGER_ATTR]
)
if not merger_locs:
msg = (
"Could not find merger module named '%s' "
"with attribute '%s' (searched %s)"
% (m_name, MERGER_ATTR, looked_locs)
)
raise ImportError(msg)
else:
mod = importer.import_module(merger_locs[0])
mod_attr = getattr(mod, MERGER_ATTR)
mergers_to_be.append((mod_attr, m_ops))
# Now form them...
mergers: list = []
root = LookupMerger(mergers)
for attr, opts in mergers_to_be:
mergers.append(attr(root, opts))
return root
|