File: extended_yaml_parser.py

package info (click to toggle)
crazy-complete 0.3.6-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 2,404 kB
  • sloc: python: 7,949; sh: 4,636; makefile: 74
file content (133 lines) | stat: -rw-r--r-- 4,477 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
'''YAML Parser with column and line information.'''

import yaml
from yaml.events import (StreamStartEvent, DocumentStartEvent, DocumentEndEvent,
                         MappingStartEvent, SequenceStartEvent, ScalarEvent,
                         MappingEndEvent, SequenceEndEvent, AliasEvent)

from .value_with_trace import ValueWithTrace
from .errors import CrazySchemaValidationError


_error = CrazySchemaValidationError


class ExtendedYAMLParser:
    '''Parse YAML with the ability to trace the origin of parsed values.'''

    def __init__(self):
        self.src = None
        self.data = []
        self.current_stack = []
        self.current_key = None

    def parse(self, stream):
        """
        Parses the given YAML stream in a SAX-like way and reconstructs the structure.

        Raises:
            - yaml.parser.ParserError
            - errors.CrazySchemaValidationError
        """
        self.src = stream
        self.data = []
        self.current_stack = []
        self.current_key = None

        loader = yaml.Loader(stream)
        try:
            while True:
                event = loader.get_event()
                if event is None:
                    break
                self.handle_event(event)
        finally:
            loader.dispose()

        return self.data

    def handle_event(self, event):
        """
        Handle each YAML parsing event and construct the corresponding data structure.
        """
        if isinstance(event, StreamStartEvent):
            pass
        elif isinstance(event, DocumentStartEvent):
            pass
        elif isinstance(event, DocumentEndEvent):
            if self.current_stack:
                self.data.append(self.current_stack.pop())
        elif isinstance(event, MappingStartEvent):
            new_map = ValueWithTrace.from_yaml_event({}, self.src, event)
            self.add_to_current_structure(self.current_key, new_map)
            self.current_stack.append(new_map)
            self.current_key = None
        elif isinstance(event, SequenceStartEvent):
            new_seq = ValueWithTrace.from_yaml_event([], self.src, event)
            self.add_to_current_structure(self.current_key, new_seq)
            self.current_stack.append(new_seq)
            self.current_key = None
        elif isinstance(event, ScalarEvent):
            value = self.convert_scalar(event.value, event.implicit)
            value = ValueWithTrace.from_yaml_event(value, self.src, event)

            if self.current_stack and isinstance(self.current_stack[-1].value, list):
                self.add_to_current_structure(None, value)
            else:
                if self.current_key is None:
                    self.current_key = value
                else:
                    self.add_to_current_structure(self.current_key, value)
                    self.current_key = None
        elif isinstance(event, (MappingEndEvent, SequenceEndEvent)):
            self.current_stack.pop()
        elif isinstance(event, AliasEvent):
            value = ValueWithTrace.from_yaml_event(None, self.src, event)
            raise _error('Anchors not supported', value)

    def add_to_current_structure(self, key, value):
        """
        Add a value to the current structure (either dict or list).
        """
        if not self.current_stack:
            # Root-level element
            self.current_stack.append(value)
            return

        current = self.current_stack[-1]
        if isinstance(current.value, dict):
            if key is None:
                raise _error("Missing key for mapping in YAML", value)
            current.value[key] = value
        elif isinstance(current.value, list):
            current.value.append(value)
        else:
            raise _error("Invalid structure in YAML", value)

    def convert_scalar(self, value, implicit):
        """
        Converts a scalar based on its type hints (implicit tuple).
        """
        if implicit[0]:
            lower = value.lower()

            if lower in ("null", "~", ""):
                return None

            if lower in ("true", "on", "yes"):
                return True

            if lower in ("false", "off", "no"):
                return False

            try:
                return int(value)
            except ValueError:
                pass

            try:
                return float(value)
            except ValueError:
                pass

        return value