File: resolver.py

package info (click to toggle)
python-dynaconf 3.2.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,900 kB
  • sloc: python: 21,464; sh: 9; makefile: 4
file content (157 lines) | stat: -rw-r--r-- 9,516 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from __future__ import absolute_import
_H='yaml_implicit_resolvers'
_G='-+0123456789'
_F='tag:yaml.org,2002:int'
_E='-+0123456789.'
_D='tag:yaml.org,2002:float'
_C='tag:yaml.org,2002:bool'
_B=False
_A=None
import re
if _B:from typing import Any,Dict,List,Union,Text,Optional;from.compat import VersionType
from.compat import string_types,_DEFAULT_YAML_VERSION
from.error import*
from.nodes import MappingNode,ScalarNode,SequenceNode
from.util import RegExp
__all__=['BaseResolver','Resolver','VersionedResolver']
implicit_resolvers=[([(1,2)],_C,RegExp('^(?:true|True|TRUE|false|False|FALSE)$',re.X),list('tTfF')),([(1,1)],_C,RegExp('^(?:y|Y|yes|Yes|YES|n|N|no|No|NO\n        |true|True|TRUE|false|False|FALSE\n        |on|On|ON|off|Off|OFF)$',re.X),list('yYnNtTfFoO')),([(1,2)],_D,RegExp('^(?:\n         [-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)?\n        |[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+)\n        |[-+]?\\.[0-9_]+(?:[eE][-+][0-9]+)?\n        |[-+]?\\.(?:inf|Inf|INF)\n        |\\.(?:nan|NaN|NAN))$',re.X),list(_E)),([(1,1)],_D,RegExp('^(?:\n         [-+]?(?:[0-9][0-9_]*)\\.[0-9_]*(?:[eE][-+]?[0-9]+)?\n        |[-+]?(?:[0-9][0-9_]*)(?:[eE][-+]?[0-9]+)\n        |\\.[0-9_]+(?:[eE][-+][0-9]+)?\n        |[-+]?[0-9][0-9_]*(?::[0-5]?[0-9])+\\.[0-9_]*  # sexagesimal float\n        |[-+]?\\.(?:inf|Inf|INF)\n        |\\.(?:nan|NaN|NAN))$',re.X),list(_E)),([(1,2)],_F,RegExp('^(?:[-+]?0b[0-1_]+\n        |[-+]?0o?[0-7_]+\n        |[-+]?[0-9_]+\n        |[-+]?0x[0-9a-fA-F_]+)$',re.X),list(_G)),([(1,1)],_F,RegExp('^(?:[-+]?0b[0-1_]+\n        |[-+]?0?[0-7_]+\n        |[-+]?(?:0|[1-9][0-9_]*)\n        |[-+]?0x[0-9a-fA-F_]+\n        |[-+]?[1-9][0-9_]*(?::[0-5]?[0-9])+)$',re.X),list(_G)),([(1,2),(1,1)],'tag:yaml.org,2002:merge',RegExp('^(?:<<)$'),['<']),([(1,2),(1,1)],'tag:yaml.org,2002:null',RegExp('^(?: ~\n        |null|Null|NULL\n        | )$',re.X),['~','n','N','']),([(1,2),(1,1)],'tag:yaml.org,2002:timestamp',RegExp('^(?:[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]\n        |[0-9][0-9][0-9][0-9] -[0-9][0-9]? -[0-9][0-9]?\n        (?:[Tt]|[ \\t]+)[0-9][0-9]?\n        :[0-9][0-9] :[0-9][0-9] (?:\\.[0-9]*)?\n        (?:[ \\t]*(?:Z|[-+][0-9][0-9]?(?::[0-9][0-9])?))?)$',re.X),list('0123456789')),([(1,2),(1,1)],'tag:yaml.org,2002:value',RegExp('^(?:=)$'),['=']),([(1,2),(1,1)],'tag:yaml.org,2002:yaml',RegExp('^(?:!|&|\\*)$'),list('!&*'))]
class ResolverError(YAMLError):0
class BaseResolver:
	DEFAULT_SCALAR_TAG='tag:yaml.org,2002:str';DEFAULT_SEQUENCE_TAG='tag:yaml.org,2002:seq';DEFAULT_MAPPING_TAG='tag:yaml.org,2002:map';yaml_implicit_resolvers={};yaml_path_resolvers={}
	def __init__(self,loadumper=_A):
		self.loadumper=loadumper
		if self.loadumper is not _A and getattr(self.loadumper,'_resolver',_A)is _A:self.loadumper._resolver=self.loadumper
		self._loader_version=_A;self.resolver_exact_paths=[];self.resolver_prefix_paths=[]
	@property
	def parser(self):
		if self.loadumper is not _A:
			if hasattr(self.loadumper,'typ'):return self.loadumper.parser
			return self.loadumper._parser
	@classmethod
	def add_implicit_resolver_base(cls,tag,regexp,first):
		if _H not in cls.__dict__:cls.yaml_implicit_resolvers=dict((k,cls.yaml_implicit_resolvers[k][:])for k in cls.yaml_implicit_resolvers)
		if first is _A:first=[_A]
		for ch in first:cls.yaml_implicit_resolvers.setdefault(ch,[]).append((tag,regexp))
	@classmethod
	def add_implicit_resolver(cls,tag,regexp,first):
		if _H not in cls.__dict__:cls.yaml_implicit_resolvers=dict((k,cls.yaml_implicit_resolvers[k][:])for k in cls.yaml_implicit_resolvers)
		if first is _A:first=[_A]
		for ch in first:cls.yaml_implicit_resolvers.setdefault(ch,[]).append((tag,regexp))
		implicit_resolvers.append(([(1,2),(1,1)],tag,regexp,first))
	@classmethod
	def add_path_resolver(cls,tag,path,kind=_A):
		if'yaml_path_resolvers'not in cls.__dict__:cls.yaml_path_resolvers=cls.yaml_path_resolvers.copy()
		new_path=[]
		for element in path:
			if isinstance(element,(list,tuple)):
				if len(element)==2:node_check,index_check=element
				elif len(element)==1:node_check=element[0];index_check=True
				else:raise ResolverError('Invalid path element: %s'%(element,))
			else:node_check=_A;index_check=element
			if node_check is str:node_check=ScalarNode
			elif node_check is list:node_check=SequenceNode
			elif node_check is dict:node_check=MappingNode
			elif node_check not in[ScalarNode,SequenceNode,MappingNode]and not isinstance(node_check,string_types)and node_check is not _A:raise ResolverError('Invalid node checker: %s'%(node_check,))
			if not isinstance(index_check,(string_types,int))and index_check is not _A:raise ResolverError('Invalid index checker: %s'%(index_check,))
			new_path.append((node_check,index_check))
		if kind is str:kind=ScalarNode
		elif kind is list:kind=SequenceNode
		elif kind is dict:kind=MappingNode
		elif kind not in[ScalarNode,SequenceNode,MappingNode]and kind is not _A:raise ResolverError('Invalid node kind: %s'%(kind,))
		cls.yaml_path_resolvers[tuple(new_path),kind]=tag
	def descend_resolver(self,current_node,current_index):
		if not self.yaml_path_resolvers:return
		exact_paths={};prefix_paths=[]
		if current_node:
			depth=len(self.resolver_prefix_paths)
			for(path,kind)in self.resolver_prefix_paths[-1]:
				if self.check_resolver_prefix(depth,path,kind,current_node,current_index):
					if len(path)>depth:prefix_paths.append((path,kind))
					else:exact_paths[kind]=self.yaml_path_resolvers[path,kind]
		else:
			for(path,kind)in self.yaml_path_resolvers:
				if not path:exact_paths[kind]=self.yaml_path_resolvers[path,kind]
				else:prefix_paths.append((path,kind))
		self.resolver_exact_paths.append(exact_paths);self.resolver_prefix_paths.append(prefix_paths)
	def ascend_resolver(self):
		if not self.yaml_path_resolvers:return
		self.resolver_exact_paths.pop();self.resolver_prefix_paths.pop()
	def check_resolver_prefix(self,depth,path,kind,current_node,current_index):
		node_check,index_check=path[depth-1]
		if isinstance(node_check,string_types):
			if current_node.tag!=node_check:return _B
		elif node_check is not _A:
			if not isinstance(current_node,node_check):return _B
		if index_check is True and current_index is not _A:return _B
		if(index_check is _B or index_check is _A)and current_index is _A:return _B
		if isinstance(index_check,string_types):
			if not(isinstance(current_index,ScalarNode)and index_check==current_index.value):return _B
		elif isinstance(index_check,int)and not isinstance(index_check,bool):
			if index_check!=current_index:return _B
		return True
	def resolve(self,kind,value,implicit):
		if kind is ScalarNode and implicit[0]:
			if value=='':resolvers=self.yaml_implicit_resolvers.get('',[])
			else:resolvers=self.yaml_implicit_resolvers.get(value[0],[])
			resolvers+=self.yaml_implicit_resolvers.get(_A,[])
			for(tag,regexp)in resolvers:
				if regexp.match(value):return tag
			implicit=implicit[1]
		if bool(self.yaml_path_resolvers):
			exact_paths=self.resolver_exact_paths[-1]
			if kind in exact_paths:return exact_paths[kind]
			if _A in exact_paths:return exact_paths[_A]
		if kind is ScalarNode:return self.DEFAULT_SCALAR_TAG
		elif kind is SequenceNode:return self.DEFAULT_SEQUENCE_TAG
		elif kind is MappingNode:return self.DEFAULT_MAPPING_TAG
	@property
	def processing_version(self):0
class Resolver(BaseResolver):0
for ir in implicit_resolvers:
	if(1,2)in ir[0]:Resolver.add_implicit_resolver_base(*ir[1:])
class VersionedResolver(BaseResolver):
	def __init__(self,version=_A,loader=_A,loadumper=_A):
		if loader is _A and loadumper is not _A:loader=loadumper
		BaseResolver.__init__(self,loader);self._loader_version=self.get_loader_version(version);self._version_implicit_resolver={}
	def add_version_implicit_resolver(self,version,tag,regexp,first):
		if first is _A:first=[_A]
		impl_resolver=self._version_implicit_resolver.setdefault(version,{})
		for ch in first:impl_resolver.setdefault(ch,[]).append((tag,regexp))
	def get_loader_version(self,version):
		if version is _A or isinstance(version,tuple):return version
		if isinstance(version,list):return tuple(version)
		return tuple(map(int,version.split('.')))
	@property
	def versioned_resolver(self):
		version=self.processing_version
		if version not in self._version_implicit_resolver:
			for x in implicit_resolvers:
				if version in x[0]:self.add_version_implicit_resolver(version,x[1],x[2],x[3])
		return self._version_implicit_resolver[version]
	def resolve(self,kind,value,implicit):
		if kind is ScalarNode and implicit[0]:
			if value=='':resolvers=self.versioned_resolver.get('',[])
			else:resolvers=self.versioned_resolver.get(value[0],[])
			resolvers+=self.versioned_resolver.get(_A,[])
			for(tag,regexp)in resolvers:
				if regexp.match(value):return tag
			implicit=implicit[1]
		if bool(self.yaml_path_resolvers):
			exact_paths=self.resolver_exact_paths[-1]
			if kind in exact_paths:return exact_paths[kind]
			if _A in exact_paths:return exact_paths[_A]
		if kind is ScalarNode:return self.DEFAULT_SCALAR_TAG
		elif kind is SequenceNode:return self.DEFAULT_SEQUENCE_TAG
		elif kind is MappingNode:return self.DEFAULT_MAPPING_TAG
	@property
	def processing_version(self):
		try:version=self.loadumper._scanner.yaml_version
		except AttributeError:
			try:
				if hasattr(self.loadumper,'typ'):version=self.loadumper.version
				else:version=self.loadumper._serializer.use_version
			except AttributeError:version=_A
		if version is _A:
			version=self._loader_version
			if version is _A:version=_DEFAULT_YAML_VERSION
		return version