1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
'''YAML Parser with column and line information.'''
import yaml
from yaml.events import (StreamStartEvent, DocumentStartEvent, DocumentEndEvent,
MappingStartEvent, SequenceStartEvent, ScalarEvent,
MappingEndEvent, SequenceEndEvent, AliasEvent)
from .value_with_trace import ValueWithTrace
from .errors import CrazySchemaValidationError
_error = CrazySchemaValidationError
class ExtendedYAMLParser:
'''Parse YAML with the ability to trace the origin of parsed values.'''
def __init__(self):
self.src = None
self.data = []
self.current_stack = []
self.current_key = None
def parse(self, stream):
"""
Parses the given YAML stream in a SAX-like way and reconstructs the structure.
Raises:
- yaml.parser.ParserError
- errors.CrazySchemaValidationError
"""
self.src = stream
self.data = []
self.current_stack = []
self.current_key = None
loader = yaml.Loader(stream)
try:
while True:
event = loader.get_event()
if event is None:
break
self.handle_event(event)
finally:
loader.dispose()
return self.data
def handle_event(self, event):
"""
Handle each YAML parsing event and construct the corresponding data structure.
"""
if isinstance(event, StreamStartEvent):
pass
elif isinstance(event, DocumentStartEvent):
pass
elif isinstance(event, DocumentEndEvent):
if self.current_stack:
self.data.append(self.current_stack.pop())
elif isinstance(event, MappingStartEvent):
new_map = ValueWithTrace.from_yaml_event({}, self.src, event)
self.add_to_current_structure(self.current_key, new_map)
self.current_stack.append(new_map)
self.current_key = None
elif isinstance(event, SequenceStartEvent):
new_seq = ValueWithTrace.from_yaml_event([], self.src, event)
self.add_to_current_structure(self.current_key, new_seq)
self.current_stack.append(new_seq)
self.current_key = None
elif isinstance(event, ScalarEvent):
value = self.convert_scalar(event.value, event.implicit)
value = ValueWithTrace.from_yaml_event(value, self.src, event)
if self.current_stack and isinstance(self.current_stack[-1].value, list):
self.add_to_current_structure(None, value)
else:
if self.current_key is None:
self.current_key = value
else:
self.add_to_current_structure(self.current_key, value)
self.current_key = None
elif isinstance(event, (MappingEndEvent, SequenceEndEvent)):
self.current_stack.pop()
elif isinstance(event, AliasEvent):
value = ValueWithTrace.from_yaml_event(None, self.src, event)
raise _error('Anchors not supported', value)
def add_to_current_structure(self, key, value):
"""
Add a value to the current structure (either dict or list).
"""
if not self.current_stack:
# Root-level element
self.current_stack.append(value)
return
current = self.current_stack[-1]
if isinstance(current.value, dict):
if key is None:
raise _error("Missing key for mapping in YAML", value)
current.value[key] = value
elif isinstance(current.value, list):
current.value.append(value)
else:
raise _error("Invalid structure in YAML", value)
def convert_scalar(self, value, implicit):
"""
Converts a scalar based on its type hints (implicit tuple).
"""
if implicit[0]:
lower = value.lower()
if lower in ("null", "~", ""):
return None
if lower in ("true", "on", "yes"):
return True
if lower in ("false", "off", "no"):
return False
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
return value
|