1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
|
from strictyaml.ruamel.comments import CommentedSeq, CommentedMap
from strictyaml.exceptions import YAMLValidationError
from strictyaml.yamlpointer import YAMLPointer
from strictyaml import utils
from copy import deepcopy, copy
import sys
if sys.version_info[0] == 3:
unicode = str
class YAMLChunk(object):
"""
Represents a section of the document with references to the ruamel
parsed document and the strictparsed document.
Most operations done by validators on the document are done using this object.
Before validation the strictparsed document will be identical to the
ruamelparsed document. After it will contain CommentedMaps, CommentedSeqs
and YAML objects.
"""
def __init__(
self,
ruamelparsed,
pointer=None,
label=None,
strictparsed=None,
key_association=None,
):
self._ruamelparsed = ruamelparsed
self._strictparsed = (
deepcopy(ruamelparsed) if strictparsed is None else strictparsed
)
self._pointer = pointer if pointer is not None else YAMLPointer()
self._label = label
# Associates strictparsed key names with ruamelparsed key names
# E.g. "my-key-name" -> "My Key name"
self._key_association = {} if key_association is None else key_association
def expecting_but_found(self, expecting, found=None):
raise YAMLValidationError(
expecting,
found if found is not None else "found {0}".format(self.found()),
self,
)
def while_parsing_found(self, what, found=None):
self.expecting_but_found("while parsing {0}".format(what), found=found)
def process(self, new_item):
strictparsed = self.pointer.parent().get(self._strictparsed, strictdoc=True)
current_parsed = (
strictparsed._value if hasattr(strictparsed, "_value") else strictparsed
)
def actual_key_from_string_key(string_key):
if string_key in current_parsed.keys():
return string_key
else:
for key in current_parsed.keys():
if hasattr(key, "_value"):
if key.text == string_key:
return key
if self.pointer.is_index():
current_parsed[self.pointer.last_index] = new_item
elif self.pointer.is_val():
current_parsed[
actual_key_from_string_key(self.pointer.last_regularkey)
] = new_item
elif self.pointer.is_key():
key = actual_key_from_string_key(self.pointer.last_regularkey)
existing_val = current_parsed[key]
del current_parsed[key]
current_parsed[new_item] = existing_val
def is_sequence(self):
return isinstance(self.contents, CommentedSeq)
def is_mapping(self):
return isinstance(self.contents, CommentedMap)
def is_scalar(self):
return not isinstance(self.contents, (CommentedMap, CommentedSeq))
def found(self):
if self.is_sequence():
return "a sequence"
elif self.is_mapping():
return "a mapping"
elif self.contents == "":
return "a blank string"
elif utils.is_integer(self.contents):
return "an arbitrary integer"
elif utils.is_decimal(self.contents):
return "an arbitrary number"
else:
return "arbitrary text"
def expect_sequence(self, expecting="when expecting a sequence"):
if not self.is_sequence():
self.expecting_but_found(expecting, "found {0}".format(self.found()))
return [self.index(i) for i in range(len(self.contents))]
def expect_mapping(self):
if not self.is_mapping():
self.expecting_but_found(
"when expecting a mapping", "found {0}".format(self.found())
)
return [
(
self.key(regular_key, unicode(validated_key)),
self.val(unicode(validated_key)),
)
for (regular_key, validated_key) in zip(
self.contents.keys(), self.strictparsed().keys()
)
]
def expect_scalar(self, what):
if not self.is_scalar():
self.expecting_but_found(
"when expecting {0}".format(what), "found {0}".format(self.found())
)
@property
def label(self):
return self._label
@property
def whole_document(self):
return self._ruamelparsed
@property
def pointer(self):
return self._pointer
def fork(self, strictindex, new_value):
"""
Return a chunk referring to the same location in a duplicated document.
Used when modifying a YAML chunk so that the modification can be validated
before changing it.
"""
forked_chunk = YAMLChunk(
deepcopy(self._ruamelparsed),
pointer=self.pointer,
label=self.label,
key_association=copy(self._key_association),
)
if self.is_scalar():
# Necessary for e.g. EmptyDict, which reports as a scalar.
forked_chunk.pointer.set(forked_chunk, "_ruamelparsed", CommentedMap())
forked_chunk.pointer.set(
forked_chunk, "_strictparsed", CommentedMap(), strictdoc=True
)
forked_chunk.contents[self.ruamelindex(strictindex)] = new_value.as_marked_up()
forked_chunk.strictparsed()[strictindex] = deepcopy(new_value.as_marked_up())
return forked_chunk
def add_key_association(self, unprocessed_key, processed_key):
self._key_association[processed_key] = unprocessed_key
@property
def key_association(self):
return self._key_association
def make_child_of(self, chunk):
"""
Link one YAML chunk to another.
Used when inserting a chunk of YAML into another chunk.
"""
if self.is_mapping():
for key, value in self.contents.items():
self.key(key, key).pointer.make_child_of(chunk.pointer)
self.val(key).make_child_of(chunk)
elif self.is_sequence():
for index, item in enumerate(self.contents):
self.index(index).make_child_of(chunk)
else:
self.pointer.make_child_of(chunk.pointer)
def _select(self, pointer):
"""
Get a YAMLChunk referenced by a pointer.
"""
return YAMLChunk(
self._ruamelparsed,
pointer=pointer,
label=self._label,
strictparsed=self._strictparsed,
key_association=copy(self._key_association),
)
def index(self, strictindex):
"""
Return a chunk in a sequence referenced by index.
"""
return self._select(self._pointer.index(self.ruamelindex(strictindex)))
def ruamelindex(self, strictindex):
"""
Get the ruamel equivalent of a strict parsed index.
E.g. 0 -> 0, 1 -> 2, parsed-via-slugify -> Parsed via slugify
"""
return (
self.key_association.get(strictindex, strictindex)
if self.is_mapping()
else strictindex
)
def val(self, strictkey):
"""
Return a chunk referencing a value in a mapping with the key 'key'.
"""
ruamelkey = self.ruamelindex(strictkey)
return self._select(self._pointer.val(ruamelkey, strictkey))
def key(self, key, strictkey=None):
"""
Return a chunk referencing a key in a mapping with the name 'key'.
"""
return self._select(self._pointer.key(key, strictkey))
def textslice(self, start, end):
"""
Return a chunk referencing a slice of a scalar text value.
"""
return self._select(self._pointer.textslice(start, end))
def start_line(self):
return self._pointer.start_line(self._ruamelparsed)
def end_line(self):
return self._pointer.end_line(self._ruamelparsed)
def lines(self):
return self._pointer.lines(self._ruamelparsed)
def lines_before(self, how_many):
return self._pointer.lines_before(self._ruamelparsed, how_many)
def lines_after(self, how_many):
return self._pointer.lines_after(self._ruamelparsed, how_many)
@property
def contents(self):
return self._pointer.get(self._ruamelparsed)
def strictparsed(self):
return self._pointer.get(self._strictparsed, strictdoc=True)
|