File: parser.py

package info (click to toggle)
python-dynaconf 3.2.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,900 kB
  • sloc: python: 21,464; sh: 9; makefile: 4
file content (214 lines) | stat: -rw-r--r-- 16,966 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
from __future__ import absolute_import
_D='expected <block end>, but found %r'
_C=True
_B=False
_A=None
from.error import MarkedYAMLError
from.tokens import*
from.events import*
from.scanner import Scanner,RoundTripScanner,ScannerError
from.compat import utf8,nprint,nprintf
if _B:from typing import Any,Dict,Optional,List
__all__=['Parser','RoundTripParser','ParserError']
class ParserError(MarkedYAMLError):0
class Parser:
	DEFAULT_TAGS={'!':'!','!!':'tag:yaml.org,2002:'}
	def __init__(self,loader):
		self.loader=loader
		if self.loader is not _A and getattr(self.loader,'_parser',_A)is _A:self.loader._parser=self
		self.reset_parser()
	def reset_parser(self):self.current_event=_A;self.tag_handles={};self.states=[];self.marks=[];self.state=self.parse_stream_start
	def dispose(self):self.reset_parser()
	@property
	def scanner(self):
		if hasattr(self.loader,'typ'):return self.loader.scanner
		return self.loader._scanner
	@property
	def resolver(self):
		if hasattr(self.loader,'typ'):return self.loader.resolver
		return self.loader._resolver
	def check_event(self,*choices):
		if self.current_event is _A:
			if self.state:self.current_event=self.state()
		if self.current_event is not _A:
			if not choices:return _C
			for choice in choices:
				if isinstance(self.current_event,choice):return _C
		return _B
	def peek_event(self):
		if self.current_event is _A:
			if self.state:self.current_event=self.state()
		return self.current_event
	def get_event(self):
		if self.current_event is _A:
			if self.state:self.current_event=self.state()
		value=self.current_event;self.current_event=_A;return value
	def parse_stream_start(self):token=self.scanner.get_token();token.move_comment(self.scanner.peek_token());event=StreamStartEvent(token.start_mark,token.end_mark,encoding=token.encoding);self.state=self.parse_implicit_document_start;return event
	def parse_implicit_document_start(self):
		if not self.scanner.check_token(DirectiveToken,DocumentStartToken,StreamEndToken):self.tag_handles=self.DEFAULT_TAGS;token=self.scanner.peek_token();start_mark=end_mark=token.start_mark;event=DocumentStartEvent(start_mark,end_mark,explicit=_B);self.states.append(self.parse_document_end);self.state=self.parse_block_node;return event
		else:return self.parse_document_start()
	def parse_document_start(self):
		while self.scanner.check_token(DocumentEndToken):self.scanner.get_token()
		if not self.scanner.check_token(StreamEndToken):
			token=self.scanner.peek_token();start_mark=token.start_mark;version,tags=self.process_directives()
			if not self.scanner.check_token(DocumentStartToken):raise ParserError(_A,_A,"expected '<document start>', but found %r"%self.scanner.peek_token().id,self.scanner.peek_token().start_mark)
			token=self.scanner.get_token();end_mark=token.end_mark;event=DocumentStartEvent(start_mark,end_mark,explicit=_C,version=version,tags=tags);self.states.append(self.parse_document_end);self.state=self.parse_document_content
		else:token=self.scanner.get_token();event=StreamEndEvent(token.start_mark,token.end_mark,comment=token.comment);assert not self.states;assert not self.marks;self.state=_A
		return event
	def parse_document_end(self):
		token=self.scanner.peek_token();start_mark=end_mark=token.start_mark;explicit=_B
		if self.scanner.check_token(DocumentEndToken):token=self.scanner.get_token();end_mark=token.end_mark;explicit=_C
		event=DocumentEndEvent(start_mark,end_mark,explicit=explicit)
		if self.resolver.processing_version==(1,1):self.state=self.parse_document_start
		else:self.state=self.parse_implicit_document_start
		return event
	def parse_document_content(self):
		if self.scanner.check_token(DirectiveToken,DocumentStartToken,DocumentEndToken,StreamEndToken):event=self.process_empty_scalar(self.scanner.peek_token().start_mark);self.state=self.states.pop();return event
		else:return self.parse_block_node()
	def process_directives(self):
		yaml_version=_A;self.tag_handles={}
		while self.scanner.check_token(DirectiveToken):
			token=self.scanner.get_token()
			if token.name=='YAML':
				if yaml_version is not _A:raise ParserError(_A,_A,'found duplicate YAML directive',token.start_mark)
				major,minor=token.value
				if major!=1:raise ParserError(_A,_A,'found incompatible YAML document (version 1.* is required)',token.start_mark)
				yaml_version=token.value
			elif token.name=='TAG':
				handle,prefix=token.value
				if handle in self.tag_handles:raise ParserError(_A,_A,'duplicate tag handle %r'%utf8(handle),token.start_mark)
				self.tag_handles[handle]=prefix
		if bool(self.tag_handles):value=yaml_version,self.tag_handles.copy()
		else:value=yaml_version,_A
		if self.loader is not _A and hasattr(self.loader,'tags'):
			self.loader.version=yaml_version
			if self.loader.tags is _A:self.loader.tags={}
			for k in self.tag_handles:self.loader.tags[k]=self.tag_handles[k]
		for key in self.DEFAULT_TAGS:
			if key not in self.tag_handles:self.tag_handles[key]=self.DEFAULT_TAGS[key]
		return value
	def parse_block_node(self):return self.parse_node(block=_C)
	def parse_flow_node(self):return self.parse_node()
	def parse_block_node_or_indentless_sequence(self):return self.parse_node(block=_C,indentless_sequence=_C)
	def transform_tag(self,handle,suffix):return self.tag_handles[handle]+suffix
	def parse_node(self,block=_B,indentless_sequence=_B):
		if self.scanner.check_token(AliasToken):token=self.scanner.get_token();event=AliasEvent(token.value,token.start_mark,token.end_mark);self.state=self.states.pop();return event
		anchor=_A;tag=_A;start_mark=end_mark=tag_mark=_A
		if self.scanner.check_token(AnchorToken):
			token=self.scanner.get_token();start_mark=token.start_mark;end_mark=token.end_mark;anchor=token.value
			if self.scanner.check_token(TagToken):token=self.scanner.get_token();tag_mark=token.start_mark;end_mark=token.end_mark;tag=token.value
		elif self.scanner.check_token(TagToken):
			token=self.scanner.get_token();start_mark=tag_mark=token.start_mark;end_mark=token.end_mark;tag=token.value
			if self.scanner.check_token(AnchorToken):token=self.scanner.get_token();start_mark=tag_mark=token.start_mark;end_mark=token.end_mark;anchor=token.value
		if tag is not _A:
			handle,suffix=tag
			if handle is not _A:
				if handle not in self.tag_handles:raise ParserError('while parsing a node',start_mark,'found undefined tag handle %r'%utf8(handle),tag_mark)
				tag=self.transform_tag(handle,suffix)
			else:tag=suffix
		if start_mark is _A:start_mark=end_mark=self.scanner.peek_token().start_mark
		event=_A;implicit=tag is _A or tag=='!'
		if indentless_sequence and self.scanner.check_token(BlockEntryToken):
			comment=_A;pt=self.scanner.peek_token()
			if pt.comment and pt.comment[0]:comment=[pt.comment[0],[]];pt.comment[0]=_A
			end_mark=self.scanner.peek_token().end_mark;event=SequenceStartEvent(anchor,tag,implicit,start_mark,end_mark,flow_style=_B,comment=comment);self.state=self.parse_indentless_sequence_entry;return event
		if self.scanner.check_token(ScalarToken):
			token=self.scanner.get_token();end_mark=token.end_mark
			if token.plain and tag is _A or tag=='!':implicit=_C,_B
			elif tag is _A:implicit=_B,_C
			else:implicit=_B,_B
			event=ScalarEvent(anchor,tag,implicit,token.value,start_mark,end_mark,style=token.style,comment=token.comment);self.state=self.states.pop()
		elif self.scanner.check_token(FlowSequenceStartToken):pt=self.scanner.peek_token();end_mark=pt.end_mark;event=SequenceStartEvent(anchor,tag,implicit,start_mark,end_mark,flow_style=_C,comment=pt.comment);self.state=self.parse_flow_sequence_first_entry
		elif self.scanner.check_token(FlowMappingStartToken):pt=self.scanner.peek_token();end_mark=pt.end_mark;event=MappingStartEvent(anchor,tag,implicit,start_mark,end_mark,flow_style=_C,comment=pt.comment);self.state=self.parse_flow_mapping_first_key
		elif block and self.scanner.check_token(BlockSequenceStartToken):
			end_mark=self.scanner.peek_token().start_mark;pt=self.scanner.peek_token();comment=pt.comment
			if comment is _A or comment[1]is _A:comment=pt.split_comment()
			event=SequenceStartEvent(anchor,tag,implicit,start_mark,end_mark,flow_style=_B,comment=comment);self.state=self.parse_block_sequence_first_entry
		elif block and self.scanner.check_token(BlockMappingStartToken):end_mark=self.scanner.peek_token().start_mark;comment=self.scanner.peek_token().comment;event=MappingStartEvent(anchor,tag,implicit,start_mark,end_mark,flow_style=_B,comment=comment);self.state=self.parse_block_mapping_first_key
		elif anchor is not _A or tag is not _A:event=ScalarEvent(anchor,tag,(implicit,_B),'',start_mark,end_mark);self.state=self.states.pop()
		else:
			if block:node='block'
			else:node='flow'
			token=self.scanner.peek_token();raise ParserError('while parsing a %s node'%node,start_mark,'expected the node content, but found %r'%token.id,token.start_mark)
		return event
	def parse_block_sequence_first_entry(self):token=self.scanner.get_token();self.marks.append(token.start_mark);return self.parse_block_sequence_entry()
	def parse_block_sequence_entry(self):
		if self.scanner.check_token(BlockEntryToken):
			token=self.scanner.get_token();token.move_comment(self.scanner.peek_token())
			if not self.scanner.check_token(BlockEntryToken,BlockEndToken):self.states.append(self.parse_block_sequence_entry);return self.parse_block_node()
			else:self.state=self.parse_block_sequence_entry;return self.process_empty_scalar(token.end_mark)
		if not self.scanner.check_token(BlockEndToken):token=self.scanner.peek_token();raise ParserError('while parsing a block collection',self.marks[-1],_D%token.id,token.start_mark)
		token=self.scanner.get_token();event=SequenceEndEvent(token.start_mark,token.end_mark,comment=token.comment);self.state=self.states.pop();self.marks.pop();return event
	def parse_indentless_sequence_entry(self):
		if self.scanner.check_token(BlockEntryToken):
			token=self.scanner.get_token();token.move_comment(self.scanner.peek_token())
			if not self.scanner.check_token(BlockEntryToken,KeyToken,ValueToken,BlockEndToken):self.states.append(self.parse_indentless_sequence_entry);return self.parse_block_node()
			else:self.state=self.parse_indentless_sequence_entry;return self.process_empty_scalar(token.end_mark)
		token=self.scanner.peek_token();event=SequenceEndEvent(token.start_mark,token.start_mark,comment=token.comment);self.state=self.states.pop();return event
	def parse_block_mapping_first_key(self):token=self.scanner.get_token();self.marks.append(token.start_mark);return self.parse_block_mapping_key()
	def parse_block_mapping_key(self):
		if self.scanner.check_token(KeyToken):
			token=self.scanner.get_token();token.move_comment(self.scanner.peek_token())
			if not self.scanner.check_token(KeyToken,ValueToken,BlockEndToken):self.states.append(self.parse_block_mapping_value);return self.parse_block_node_or_indentless_sequence()
			else:self.state=self.parse_block_mapping_value;return self.process_empty_scalar(token.end_mark)
		if self.resolver.processing_version>(1,1)and self.scanner.check_token(ValueToken):self.state=self.parse_block_mapping_value;return self.process_empty_scalar(self.scanner.peek_token().start_mark)
		if not self.scanner.check_token(BlockEndToken):token=self.scanner.peek_token();raise ParserError('while parsing a block mapping',self.marks[-1],_D%token.id,token.start_mark)
		token=self.scanner.get_token();token.move_comment(self.scanner.peek_token());event=MappingEndEvent(token.start_mark,token.end_mark,comment=token.comment);self.state=self.states.pop();self.marks.pop();return event
	def parse_block_mapping_value(self):
		if self.scanner.check_token(ValueToken):
			token=self.scanner.get_token()
			if self.scanner.check_token(ValueToken):token.move_comment(self.scanner.peek_token())
			elif not self.scanner.check_token(KeyToken):token.move_comment(self.scanner.peek_token(),empty=_C)
			if not self.scanner.check_token(KeyToken,ValueToken,BlockEndToken):self.states.append(self.parse_block_mapping_key);return self.parse_block_node_or_indentless_sequence()
			else:
				self.state=self.parse_block_mapping_key;comment=token.comment
				if comment is _A:
					token=self.scanner.peek_token();comment=token.comment
					if comment:token._comment=[_A,comment[1]];comment=[comment[0],_A]
				return self.process_empty_scalar(token.end_mark,comment=comment)
		else:self.state=self.parse_block_mapping_key;token=self.scanner.peek_token();return self.process_empty_scalar(token.start_mark)
	def parse_flow_sequence_first_entry(self):token=self.scanner.get_token();self.marks.append(token.start_mark);return self.parse_flow_sequence_entry(first=_C)
	def parse_flow_sequence_entry(self,first=_B):
		if not self.scanner.check_token(FlowSequenceEndToken):
			if not first:
				if self.scanner.check_token(FlowEntryToken):self.scanner.get_token()
				else:token=self.scanner.peek_token();raise ParserError('while parsing a flow sequence',self.marks[-1],"expected ',' or ']', but got %r"%token.id,token.start_mark)
			if self.scanner.check_token(KeyToken):token=self.scanner.peek_token();event=MappingStartEvent(_A,_A,_C,token.start_mark,token.end_mark,flow_style=_C);self.state=self.parse_flow_sequence_entry_mapping_key;return event
			elif not self.scanner.check_token(FlowSequenceEndToken):self.states.append(self.parse_flow_sequence_entry);return self.parse_flow_node()
		token=self.scanner.get_token();event=SequenceEndEvent(token.start_mark,token.end_mark,comment=token.comment);self.state=self.states.pop();self.marks.pop();return event
	def parse_flow_sequence_entry_mapping_key(self):
		token=self.scanner.get_token()
		if not self.scanner.check_token(ValueToken,FlowEntryToken,FlowSequenceEndToken):self.states.append(self.parse_flow_sequence_entry_mapping_value);return self.parse_flow_node()
		else:self.state=self.parse_flow_sequence_entry_mapping_value;return self.process_empty_scalar(token.end_mark)
	def parse_flow_sequence_entry_mapping_value(self):
		if self.scanner.check_token(ValueToken):
			token=self.scanner.get_token()
			if not self.scanner.check_token(FlowEntryToken,FlowSequenceEndToken):self.states.append(self.parse_flow_sequence_entry_mapping_end);return self.parse_flow_node()
			else:self.state=self.parse_flow_sequence_entry_mapping_end;return self.process_empty_scalar(token.end_mark)
		else:self.state=self.parse_flow_sequence_entry_mapping_end;token=self.scanner.peek_token();return self.process_empty_scalar(token.start_mark)
	def parse_flow_sequence_entry_mapping_end(self):self.state=self.parse_flow_sequence_entry;token=self.scanner.peek_token();return MappingEndEvent(token.start_mark,token.start_mark)
	def parse_flow_mapping_first_key(self):token=self.scanner.get_token();self.marks.append(token.start_mark);return self.parse_flow_mapping_key(first=_C)
	def parse_flow_mapping_key(self,first=_B):
		if not self.scanner.check_token(FlowMappingEndToken):
			if not first:
				if self.scanner.check_token(FlowEntryToken):self.scanner.get_token()
				else:token=self.scanner.peek_token();raise ParserError('while parsing a flow mapping',self.marks[-1],"expected ',' or '}', but got %r"%token.id,token.start_mark)
			if self.scanner.check_token(KeyToken):
				token=self.scanner.get_token()
				if not self.scanner.check_token(ValueToken,FlowEntryToken,FlowMappingEndToken):self.states.append(self.parse_flow_mapping_value);return self.parse_flow_node()
				else:self.state=self.parse_flow_mapping_value;return self.process_empty_scalar(token.end_mark)
			elif self.resolver.processing_version>(1,1)and self.scanner.check_token(ValueToken):self.state=self.parse_flow_mapping_value;return self.process_empty_scalar(self.scanner.peek_token().end_mark)
			elif not self.scanner.check_token(FlowMappingEndToken):self.states.append(self.parse_flow_mapping_empty_value);return self.parse_flow_node()
		token=self.scanner.get_token();event=MappingEndEvent(token.start_mark,token.end_mark,comment=token.comment);self.state=self.states.pop();self.marks.pop();return event
	def parse_flow_mapping_value(self):
		if self.scanner.check_token(ValueToken):
			token=self.scanner.get_token()
			if not self.scanner.check_token(FlowEntryToken,FlowMappingEndToken):self.states.append(self.parse_flow_mapping_key);return self.parse_flow_node()
			else:self.state=self.parse_flow_mapping_key;return self.process_empty_scalar(token.end_mark)
		else:self.state=self.parse_flow_mapping_key;token=self.scanner.peek_token();return self.process_empty_scalar(token.start_mark)
	def parse_flow_mapping_empty_value(self):self.state=self.parse_flow_mapping_key;return self.process_empty_scalar(self.scanner.peek_token().start_mark)
	def process_empty_scalar(self,mark,comment=_A):return ScalarEvent(_A,_A,(_C,_B),'',mark,mark,comment=comment)
class RoundTripParser(Parser):
	def transform_tag(self,handle,suffix):
		if handle=='!!'and suffix in('null','bool','int','float','binary','timestamp','omap','pairs','set','str','seq','map'):return Parser.transform_tag(self,handle,suffix)
		return handle+suffix