1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455
|
from __future__ import absolute_import,unicode_literals,print_function
_L='_emitter'
_K='_serializer'
_J='{}.dump(_all) takes two positional argument but at least three were given ({!r})'
_I='_stream'
_H='{}.__init__() takes no positional argument but at least one was given ({!r})'
_G='yaml_tag'
_F='open'
_E='rt'
_D='_'
_C=True
_B=False
_A=None
import sys,os,warnings,glob
from importlib import import_module
import dynaconf.vendor.ruamel as ruamel
from.error import UnsafeLoaderWarning,YAMLError
from.tokens import*
from.events import*
from.nodes import*
from.loader import BaseLoader,SafeLoader,Loader,RoundTripLoader
from.dumper import BaseDumper,SafeDumper,Dumper,RoundTripDumper
from.compat import StringIO,BytesIO,with_metaclass,PY3,nprint
from.resolver import VersionedResolver,Resolver
from.representer import BaseRepresenter,SafeRepresenter,Representer,RoundTripRepresenter
from.constructor import BaseConstructor,SafeConstructor,Constructor,RoundTripConstructor
from.loader import Loader as UnsafeLoader
if _B:
from typing import List,Set,Dict,Union,Any,Callable,Optional,Text;from.compat import StreamType,StreamTextType,VersionType
if PY3:from pathlib import Path
else:Path=Any
try:from _ruamel_yaml import CParser,CEmitter
except:CParser=CEmitter=_A
enforce=object()
class YAML:
def __init__(self,_kw=enforce,typ=_A,pure=_B,output=_A,plug_ins=_A):
if _kw is not enforce:raise TypeError(_H.format(self.__class__.__name__,_kw))
self.typ=[_E]if typ is _A else typ if isinstance(typ,list)else[typ];self.pure=pure;self._output=output;self._context_manager=_A;self.plug_ins=[]
for pu in([]if plug_ins is _A else plug_ins)+self.official_plug_ins():file_name=pu.replace(os.sep,'.');self.plug_ins.append(import_module(file_name))
self.Resolver=ruamel.yaml.resolver.VersionedResolver;self.allow_unicode=_C;self.Reader=_A;self.Representer=_A;self.Constructor=_A;self.Scanner=_A;self.Serializer=_A;self.default_flow_style=_A;typ_found=1;setup_rt=_B
if _E in self.typ:setup_rt=_C
elif'safe'in self.typ:self.Emitter=ruamel.yaml.emitter.Emitter if pure or CEmitter is _A else CEmitter;self.Representer=ruamel.yaml.representer.SafeRepresenter;self.Parser=ruamel.yaml.parser.Parser if pure or CParser is _A else CParser;self.Composer=ruamel.yaml.composer.Composer;self.Constructor=ruamel.yaml.constructor.SafeConstructor
elif'base'in self.typ:self.Emitter=ruamel.yaml.emitter.Emitter;self.Representer=ruamel.yaml.representer.BaseRepresenter;self.Parser=ruamel.yaml.parser.Parser if pure or CParser is _A else CParser;self.Composer=ruamel.yaml.composer.Composer;self.Constructor=ruamel.yaml.constructor.BaseConstructor
elif'unsafe'in self.typ:self.Emitter=ruamel.yaml.emitter.Emitter if pure or CEmitter is _A else CEmitter;self.Representer=ruamel.yaml.representer.Representer;self.Parser=ruamel.yaml.parser.Parser if pure or CParser is _A else CParser;self.Composer=ruamel.yaml.composer.Composer;self.Constructor=ruamel.yaml.constructor.Constructor
else:setup_rt=_C;typ_found=0
if setup_rt:self.default_flow_style=_B;self.Emitter=ruamel.yaml.emitter.Emitter;self.Serializer=ruamel.yaml.serializer.Serializer;self.Representer=ruamel.yaml.representer.RoundTripRepresenter;self.Scanner=ruamel.yaml.scanner.RoundTripScanner;self.Parser=ruamel.yaml.parser.RoundTripParser;self.Composer=ruamel.yaml.composer.Composer;self.Constructor=ruamel.yaml.constructor.RoundTripConstructor
del setup_rt;self.stream=_A;self.canonical=_A;self.old_indent=_A;self.width=_A;self.line_break=_A;self.map_indent=_A;self.sequence_indent=_A;self.sequence_dash_offset=0;self.compact_seq_seq=_A;self.compact_seq_map=_A;self.sort_base_mapping_type_on_output=_A;self.top_level_colon_align=_A;self.prefix_colon=_A;self.version=_A;self.preserve_quotes=_A;self.allow_duplicate_keys=_B;self.encoding='utf-8';self.explicit_start=_A;self.explicit_end=_A;self.tags=_A;self.default_style=_A;self.top_level_block_style_scalar_no_indent_error_1_1=_B;self.scalar_after_indicator=_A;self.brace_single_entry_mapping_in_flow_sequence=_B
for module in self.plug_ins:
if getattr(module,'typ',_A)in self.typ:typ_found+=1;module.init_typ(self);break
if typ_found==0:raise NotImplementedError('typ "{}"not recognised (need to install plug-in?)'.format(self.typ))
@property
def reader(self):
try:return self._reader
except AttributeError:self._reader=self.Reader(_A,loader=self);return self._reader
@property
def scanner(self):
try:return self._scanner
except AttributeError:self._scanner=self.Scanner(loader=self);return self._scanner
@property
def parser(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):
if self.Parser is not CParser:setattr(self,attr,self.Parser(loader=self))
elif getattr(self,_I,_A)is _A:return
else:setattr(self,attr,CParser(self._stream))
return getattr(self,attr)
@property
def composer(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):setattr(self,attr,self.Composer(loader=self))
return getattr(self,attr)
@property
def constructor(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):cnst=self.Constructor(preserve_quotes=self.preserve_quotes,loader=self);cnst.allow_duplicate_keys=self.allow_duplicate_keys;setattr(self,attr,cnst)
return getattr(self,attr)
@property
def resolver(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):setattr(self,attr,self.Resolver(version=self.version,loader=self))
return getattr(self,attr)
@property
def emitter(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):
if self.Emitter is not CEmitter:
_emitter=self.Emitter(_A,canonical=self.canonical,indent=self.old_indent,width=self.width,allow_unicode=self.allow_unicode,line_break=self.line_break,prefix_colon=self.prefix_colon,brace_single_entry_mapping_in_flow_sequence=self.brace_single_entry_mapping_in_flow_sequence,dumper=self);setattr(self,attr,_emitter)
if self.map_indent is not _A:_emitter.best_map_indent=self.map_indent
if self.sequence_indent is not _A:_emitter.best_sequence_indent=self.sequence_indent
if self.sequence_dash_offset is not _A:_emitter.sequence_dash_offset=self.sequence_dash_offset
if self.compact_seq_seq is not _A:_emitter.compact_seq_seq=self.compact_seq_seq
if self.compact_seq_map is not _A:_emitter.compact_seq_map=self.compact_seq_map
else:
if getattr(self,_I,_A)is _A:return
return
return getattr(self,attr)
@property
def serializer(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):setattr(self,attr,self.Serializer(encoding=self.encoding,explicit_start=self.explicit_start,explicit_end=self.explicit_end,version=self.version,tags=self.tags,dumper=self))
return getattr(self,attr)
@property
def representer(self):
attr=_D+sys._getframe().f_code.co_name
if not hasattr(self,attr):
repres=self.Representer(default_style=self.default_style,default_flow_style=self.default_flow_style,dumper=self)
if self.sort_base_mapping_type_on_output is not _A:repres.sort_base_mapping_type_on_output=self.sort_base_mapping_type_on_output
setattr(self,attr,repres)
return getattr(self,attr)
def load(self,stream):
if not hasattr(stream,'read')and hasattr(stream,_F):
with stream.open('rb')as fp:return self.load(fp)
constructor,parser=self.get_constructor_parser(stream)
try:return constructor.get_single_data()
finally:
parser.dispose()
try:self._reader.reset_reader()
except AttributeError:pass
try:self._scanner.reset_scanner()
except AttributeError:pass
def load_all(self,stream,_kw=enforce):
if _kw is not enforce:raise TypeError(_H.format(self.__class__.__name__,_kw))
if not hasattr(stream,'read')and hasattr(stream,_F):
with stream.open('r')as fp:
for d in self.load_all(fp,_kw=enforce):yield d
return
constructor,parser=self.get_constructor_parser(stream)
try:
while constructor.check_data():yield constructor.get_data()
finally:
parser.dispose()
try:self._reader.reset_reader()
except AttributeError:pass
try:self._scanner.reset_scanner()
except AttributeError:pass
def get_constructor_parser(self,stream):
if self.Parser is not CParser:
if self.Reader is _A:self.Reader=ruamel.yaml.reader.Reader
if self.Scanner is _A:self.Scanner=ruamel.yaml.scanner.Scanner
self.reader.stream=stream
elif self.Reader is not _A:
if self.Scanner is _A:self.Scanner=ruamel.yaml.scanner.Scanner
self.Parser=ruamel.yaml.parser.Parser;self.reader.stream=stream
elif self.Scanner is not _A:
if self.Reader is _A:self.Reader=ruamel.yaml.reader.Reader
self.Parser=ruamel.yaml.parser.Parser;self.reader.stream=stream
else:
rslvr=self.Resolver
class XLoader(self.Parser,self.Constructor,rslvr):
def __init__(selfx,stream,version=self.version,preserve_quotes=_A):CParser.__init__(selfx,stream);selfx._parser=selfx._composer=selfx;self.Constructor.__init__(selfx,loader=selfx);selfx.allow_duplicate_keys=self.allow_duplicate_keys;rslvr.__init__(selfx,version=version,loadumper=selfx)
self._stream=stream;loader=XLoader(stream);return loader,loader
return self.constructor,self.parser
def dump(self,data,stream=_A,_kw=enforce,transform=_A):
if self._context_manager:
if not self._output:raise TypeError('Missing output stream while dumping from context manager')
if _kw is not enforce:raise TypeError('{}.dump() takes one positional argument but at least two were given ({!r})'.format(self.__class__.__name__,_kw))
if transform is not _A:raise TypeError('{}.dump() in the context manager cannot have transform keyword '.format(self.__class__.__name__))
self._context_manager.dump(data)
else:
if stream is _A:raise TypeError('Need a stream argument when not dumping from context manager')
return self.dump_all([data],stream,_kw,transform=transform)
def dump_all(self,documents,stream,_kw=enforce,transform=_A):
if self._context_manager:raise NotImplementedError
if _kw is not enforce:raise TypeError(_J.format(self.__class__.__name__,_kw))
self._output=stream;self._context_manager=YAMLContextManager(self,transform=transform)
for data in documents:self._context_manager.dump(data)
self._context_manager.teardown_output();self._output=_A;self._context_manager=_A
def Xdump_all(self,documents,stream,_kw=enforce,transform=_A):
if not hasattr(stream,'write')and hasattr(stream,_F):
with stream.open('w')as fp:return self.dump_all(documents,fp,_kw,transform=transform)
if _kw is not enforce:raise TypeError(_J.format(self.__class__.__name__,_kw))
if self.top_level_colon_align is _C:tlca=max([len(str(x))for x in documents[0]])
else:tlca=self.top_level_colon_align
if transform is not _A:
fstream=stream
if self.encoding is _A:stream=StringIO()
else:stream=BytesIO()
serializer,representer,emitter=self.get_serializer_representer_emitter(stream,tlca)
try:
self.serializer.open()
for data in documents:
try:self.representer.represent(data)
except AttributeError:raise
self.serializer.close()
finally:
try:self.emitter.dispose()
except AttributeError:raise
delattr(self,_K);delattr(self,_L)
if transform:
val=stream.getvalue()
if self.encoding:val=val.decode(self.encoding)
if fstream is _A:transform(val)
else:fstream.write(transform(val))
def get_serializer_representer_emitter(self,stream,tlca):
if self.Emitter is not CEmitter:
if self.Serializer is _A:self.Serializer=ruamel.yaml.serializer.Serializer
self.emitter.stream=stream;self.emitter.top_level_colon_align=tlca
if self.scalar_after_indicator is not _A:self.emitter.scalar_after_indicator=self.scalar_after_indicator
return self.serializer,self.representer,self.emitter
if self.Serializer is not _A:
self.Emitter=ruamel.yaml.emitter.Emitter;self.emitter.stream=stream;self.emitter.top_level_colon_align=tlca
if self.scalar_after_indicator is not _A:self.emitter.scalar_after_indicator=self.scalar_after_indicator
return self.serializer,self.representer,self.emitter
rslvr=ruamel.yaml.resolver.BaseResolver if'base'in self.typ else ruamel.yaml.resolver.Resolver
class XDumper(CEmitter,self.Representer,rslvr):
def __init__(selfx,stream,default_style=_A,default_flow_style=_A,canonical=_A,indent=_A,width=_A,allow_unicode=_A,line_break=_A,encoding=_A,explicit_start=_A,explicit_end=_A,version=_A,tags=_A,block_seq_indent=_A,top_level_colon_align=_A,prefix_colon=_A):CEmitter.__init__(selfx,stream,canonical=canonical,indent=indent,width=width,encoding=encoding,allow_unicode=allow_unicode,line_break=line_break,explicit_start=explicit_start,explicit_end=explicit_end,version=version,tags=tags);selfx._emitter=selfx._serializer=selfx._representer=selfx;self.Representer.__init__(selfx,default_style=default_style,default_flow_style=default_flow_style);rslvr.__init__(selfx)
self._stream=stream;dumper=XDumper(stream,default_style=self.default_style,default_flow_style=self.default_flow_style,canonical=self.canonical,indent=self.old_indent,width=self.width,allow_unicode=self.allow_unicode,line_break=self.line_break,explicit_start=self.explicit_start,explicit_end=self.explicit_end,version=self.version,tags=self.tags);self._emitter=self._serializer=dumper;return dumper,dumper,dumper
def map(self,**kw):
if _E in self.typ:from dynaconf.vendor.ruamel.yaml.comments import CommentedMap;return CommentedMap(**kw)
else:return dict(**kw)
def seq(self,*args):
if _E in self.typ:from dynaconf.vendor.ruamel.yaml.comments import CommentedSeq;return CommentedSeq(*args)
else:return list(*args)
def official_plug_ins(self):bd=os.path.dirname(__file__);gpbd=os.path.dirname(os.path.dirname(bd));res=[x.replace(gpbd,'')[1:-3]for x in glob.glob(bd+'/*/__plug_in__.py')];return res
def register_class(self,cls):
tag=getattr(cls,_G,'!'+cls.__name__)
try:self.representer.add_representer(cls,cls.to_yaml)
except AttributeError:
def t_y(representer,data):return representer.represent_yaml_object(tag,data,cls,flow_style=representer.default_flow_style)
self.representer.add_representer(cls,t_y)
try:self.constructor.add_constructor(tag,cls.from_yaml)
except AttributeError:
def f_y(constructor,node):return constructor.construct_yaml_object(node,cls)
self.constructor.add_constructor(tag,f_y)
return cls
def parse(self,stream):
_,parser=self.get_constructor_parser(stream)
try:
while parser.check_event():yield parser.get_event()
finally:
parser.dispose()
try:self._reader.reset_reader()
except AttributeError:pass
try:self._scanner.reset_scanner()
except AttributeError:pass
def __enter__(self):self._context_manager=YAMLContextManager(self);return self
def __exit__(self,typ,value,traceback):
if typ:nprint('typ',typ)
self._context_manager.teardown_output();self._context_manager=_A
def _indent(self,mapping=_A,sequence=_A,offset=_A):
if mapping is not _A:self.map_indent=mapping
if sequence is not _A:self.sequence_indent=sequence
if offset is not _A:self.sequence_dash_offset=offset
@property
def indent(self):return self._indent
@indent.setter
def indent(self,val):self.old_indent=val
@property
def block_seq_indent(self):return self.sequence_dash_offset
@block_seq_indent.setter
def block_seq_indent(self,val):self.sequence_dash_offset=val
def compact(self,seq_seq=_A,seq_map=_A):self.compact_seq_seq=seq_seq;self.compact_seq_map=seq_map
class YAMLContextManager:
def __init__(self,yaml,transform=_A):
self._yaml=yaml;self._output_inited=_B;self._output_path=_A;self._output=self._yaml._output;self._transform=transform
if not hasattr(self._output,'write')and hasattr(self._output,_F):self._output_path=self._output;self._output=self._output_path.open('w')
if self._transform is not _A:
self._fstream=self._output
if self._yaml.encoding is _A:self._output=StringIO()
else:self._output=BytesIO()
def teardown_output(self):
if self._output_inited:self._yaml.serializer.close()
else:return
try:self._yaml.emitter.dispose()
except AttributeError:raise
try:delattr(self._yaml,_K);delattr(self._yaml,_L)
except AttributeError:raise
if self._transform:
val=self._output.getvalue()
if self._yaml.encoding:val=val.decode(self._yaml.encoding)
if self._fstream is _A:self._transform(val)
else:self._fstream.write(self._transform(val));self._fstream.flush();self._output=self._fstream
if self._output_path is not _A:self._output.close()
def init_output(self,first_data):
if self._yaml.top_level_colon_align is _C:tlca=max([len(str(x))for x in first_data])
else:tlca=self._yaml.top_level_colon_align
self._yaml.get_serializer_representer_emitter(self._output,tlca);self._yaml.serializer.open();self._output_inited=_C
def dump(self,data):
if not self._output_inited:self.init_output(data)
try:self._yaml.representer.represent(data)
except AttributeError:raise
def yaml_object(yml):
def yo_deco(cls):
tag=getattr(cls,_G,'!'+cls.__name__)
try:yml.representer.add_representer(cls,cls.to_yaml)
except AttributeError:
def t_y(representer,data):return representer.represent_yaml_object(tag,data,cls,flow_style=representer.default_flow_style)
yml.representer.add_representer(cls,t_y)
try:yml.constructor.add_constructor(tag,cls.from_yaml)
except AttributeError:
def f_y(constructor,node):return constructor.construct_yaml_object(node,cls)
yml.constructor.add_constructor(tag,f_y)
return cls
return yo_deco
def scan(stream,Loader=Loader):
loader=Loader(stream)
try:
while loader.scanner.check_token():yield loader.scanner.get_token()
finally:loader._parser.dispose()
def parse(stream,Loader=Loader):
loader=Loader(stream)
try:
while loader._parser.check_event():yield loader._parser.get_event()
finally:loader._parser.dispose()
def compose(stream,Loader=Loader):
loader=Loader(stream)
try:return loader.get_single_node()
finally:loader.dispose()
def compose_all(stream,Loader=Loader):
loader=Loader(stream)
try:
while loader.check_node():yield loader._composer.get_node()
finally:loader._parser.dispose()
def load(stream,Loader=_A,version=_A,preserve_quotes=_A):
if Loader is _A:warnings.warn(UnsafeLoaderWarning.text,UnsafeLoaderWarning,stacklevel=2);Loader=UnsafeLoader
loader=Loader(stream,version,preserve_quotes=preserve_quotes)
try:return loader._constructor.get_single_data()
finally:
loader._parser.dispose()
try:loader._reader.reset_reader()
except AttributeError:pass
try:loader._scanner.reset_scanner()
except AttributeError:pass
def load_all(stream,Loader=_A,version=_A,preserve_quotes=_A):
if Loader is _A:warnings.warn(UnsafeLoaderWarning.text,UnsafeLoaderWarning,stacklevel=2);Loader=UnsafeLoader
loader=Loader(stream,version,preserve_quotes=preserve_quotes)
try:
while loader._constructor.check_data():yield loader._constructor.get_data()
finally:
loader._parser.dispose()
try:loader._reader.reset_reader()
except AttributeError:pass
try:loader._scanner.reset_scanner()
except AttributeError:pass
def safe_load(stream,version=_A):return load(stream,SafeLoader,version)
def safe_load_all(stream,version=_A):return load_all(stream,SafeLoader,version)
def round_trip_load(stream,version=_A,preserve_quotes=_A):return load(stream,RoundTripLoader,version,preserve_quotes=preserve_quotes)
def round_trip_load_all(stream,version=_A,preserve_quotes=_A):return load_all(stream,RoundTripLoader,version,preserve_quotes=preserve_quotes)
def emit(events,stream=_A,Dumper=Dumper,canonical=_A,indent=_A,width=_A,allow_unicode=_A,line_break=_A):
getvalue=_A
if stream is _A:stream=StringIO();getvalue=stream.getvalue
dumper=Dumper(stream,canonical=canonical,indent=indent,width=width,allow_unicode=allow_unicode,line_break=line_break)
try:
for event in events:dumper.emit(event)
finally:
try:dumper._emitter.dispose()
except AttributeError:raise;dumper.dispose()
if getvalue is not _A:return getvalue()
enc=_A if PY3 else'utf-8'
def serialize_all(nodes,stream=_A,Dumper=Dumper,canonical=_A,indent=_A,width=_A,allow_unicode=_A,line_break=_A,encoding=enc,explicit_start=_A,explicit_end=_A,version=_A,tags=_A):
getvalue=_A
if stream is _A:
if encoding is _A:stream=StringIO()
else:stream=BytesIO()
getvalue=stream.getvalue
dumper=Dumper(stream,canonical=canonical,indent=indent,width=width,allow_unicode=allow_unicode,line_break=line_break,encoding=encoding,version=version,tags=tags,explicit_start=explicit_start,explicit_end=explicit_end)
try:
dumper._serializer.open()
for node in nodes:dumper.serialize(node)
dumper._serializer.close()
finally:
try:dumper._emitter.dispose()
except AttributeError:raise;dumper.dispose()
if getvalue is not _A:return getvalue()
def serialize(node,stream=_A,Dumper=Dumper,**kwds):return serialize_all([node],stream,Dumper=Dumper,**kwds)
def dump_all(documents,stream=_A,Dumper=Dumper,default_style=_A,default_flow_style=_A,canonical=_A,indent=_A,width=_A,allow_unicode=_A,line_break=_A,encoding=enc,explicit_start=_A,explicit_end=_A,version=_A,tags=_A,block_seq_indent=_A,top_level_colon_align=_A,prefix_colon=_A):
getvalue=_A
if top_level_colon_align is _C:top_level_colon_align=max([len(str(x))for x in documents[0]])
if stream is _A:
if encoding is _A:stream=StringIO()
else:stream=BytesIO()
getvalue=stream.getvalue
dumper=Dumper(stream,default_style=default_style,default_flow_style=default_flow_style,canonical=canonical,indent=indent,width=width,allow_unicode=allow_unicode,line_break=line_break,encoding=encoding,explicit_start=explicit_start,explicit_end=explicit_end,version=version,tags=tags,block_seq_indent=block_seq_indent,top_level_colon_align=top_level_colon_align,prefix_colon=prefix_colon)
try:
dumper._serializer.open()
for data in documents:
try:dumper._representer.represent(data)
except AttributeError:raise
dumper._serializer.close()
finally:
try:dumper._emitter.dispose()
except AttributeError:raise;dumper.dispose()
if getvalue is not _A:return getvalue()
def dump(data,stream=_A,Dumper=Dumper,default_style=_A,default_flow_style=_A,canonical=_A,indent=_A,width=_A,allow_unicode=_A,line_break=_A,encoding=enc,explicit_start=_A,explicit_end=_A,version=_A,tags=_A,block_seq_indent=_A):return dump_all([data],stream,Dumper=Dumper,default_style=default_style,default_flow_style=default_flow_style,canonical=canonical,indent=indent,width=width,allow_unicode=allow_unicode,line_break=line_break,encoding=encoding,explicit_start=explicit_start,explicit_end=explicit_end,version=version,tags=tags,block_seq_indent=block_seq_indent)
def safe_dump_all(documents,stream=_A,**kwds):return dump_all(documents,stream,Dumper=SafeDumper,**kwds)
def safe_dump(data,stream=_A,**kwds):return dump_all([data],stream,Dumper=SafeDumper,**kwds)
def round_trip_dump(data,stream=_A,Dumper=RoundTripDumper,default_style=_A,default_flow_style=_A,canonical=_A,indent=_A,width=_A,allow_unicode=_A,line_break=_A,encoding=enc,explicit_start=_A,explicit_end=_A,version=_A,tags=_A,block_seq_indent=_A,top_level_colon_align=_A,prefix_colon=_A):allow_unicode=_C if allow_unicode is _A else allow_unicode;return dump_all([data],stream,Dumper=Dumper,default_style=default_style,default_flow_style=default_flow_style,canonical=canonical,indent=indent,width=width,allow_unicode=allow_unicode,line_break=line_break,encoding=encoding,explicit_start=explicit_start,explicit_end=explicit_end,version=version,tags=tags,block_seq_indent=block_seq_indent,top_level_colon_align=top_level_colon_align,prefix_colon=prefix_colon)
def add_implicit_resolver(tag,regexp,first=_A,Loader=_A,Dumper=_A,resolver=Resolver):
A='add_implicit_resolver'
if Loader is _A and Dumper is _A:resolver.add_implicit_resolver(tag,regexp,first);return
if Loader:
if hasattr(Loader,A):Loader.add_implicit_resolver(tag,regexp,first)
elif issubclass(Loader,(BaseLoader,SafeLoader,ruamel.yaml.loader.Loader,RoundTripLoader)):Resolver.add_implicit_resolver(tag,regexp,first)
else:raise NotImplementedError
if Dumper:
if hasattr(Dumper,A):Dumper.add_implicit_resolver(tag,regexp,first)
elif issubclass(Dumper,(BaseDumper,SafeDumper,ruamel.yaml.dumper.Dumper,RoundTripDumper)):Resolver.add_implicit_resolver(tag,regexp,first)
else:raise NotImplementedError
def add_path_resolver(tag,path,kind=_A,Loader=_A,Dumper=_A,resolver=Resolver):
A='add_path_resolver'
if Loader is _A and Dumper is _A:resolver.add_path_resolver(tag,path,kind);return
if Loader:
if hasattr(Loader,A):Loader.add_path_resolver(tag,path,kind)
elif issubclass(Loader,(BaseLoader,SafeLoader,ruamel.yaml.loader.Loader,RoundTripLoader)):Resolver.add_path_resolver(tag,path,kind)
else:raise NotImplementedError
if Dumper:
if hasattr(Dumper,A):Dumper.add_path_resolver(tag,path,kind)
elif issubclass(Dumper,(BaseDumper,SafeDumper,ruamel.yaml.dumper.Dumper,RoundTripDumper)):Resolver.add_path_resolver(tag,path,kind)
else:raise NotImplementedError
def add_constructor(tag,object_constructor,Loader=_A,constructor=Constructor):
if Loader is _A:constructor.add_constructor(tag,object_constructor)
else:
if hasattr(Loader,'add_constructor'):Loader.add_constructor(tag,object_constructor);return
if issubclass(Loader,BaseLoader):BaseConstructor.add_constructor(tag,object_constructor)
elif issubclass(Loader,SafeLoader):SafeConstructor.add_constructor(tag,object_constructor)
elif issubclass(Loader,Loader):Constructor.add_constructor(tag,object_constructor)
elif issubclass(Loader,RoundTripLoader):RoundTripConstructor.add_constructor(tag,object_constructor)
else:raise NotImplementedError
def add_multi_constructor(tag_prefix,multi_constructor,Loader=_A,constructor=Constructor):
if Loader is _A:constructor.add_multi_constructor(tag_prefix,multi_constructor)
else:
if _B and hasattr(Loader,'add_multi_constructor'):Loader.add_multi_constructor(tag_prefix,constructor);return
if issubclass(Loader,BaseLoader):BaseConstructor.add_multi_constructor(tag_prefix,multi_constructor)
elif issubclass(Loader,SafeLoader):SafeConstructor.add_multi_constructor(tag_prefix,multi_constructor)
elif issubclass(Loader,ruamel.yaml.loader.Loader):Constructor.add_multi_constructor(tag_prefix,multi_constructor)
elif issubclass(Loader,RoundTripLoader):RoundTripConstructor.add_multi_constructor(tag_prefix,multi_constructor)
else:raise NotImplementedError
def add_representer(data_type,object_representer,Dumper=_A,representer=Representer):
if Dumper is _A:representer.add_representer(data_type,object_representer)
else:
if hasattr(Dumper,'add_representer'):Dumper.add_representer(data_type,object_representer);return
if issubclass(Dumper,BaseDumper):BaseRepresenter.add_representer(data_type,object_representer)
elif issubclass(Dumper,SafeDumper):SafeRepresenter.add_representer(data_type,object_representer)
elif issubclass(Dumper,Dumper):Representer.add_representer(data_type,object_representer)
elif issubclass(Dumper,RoundTripDumper):RoundTripRepresenter.add_representer(data_type,object_representer)
else:raise NotImplementedError
def add_multi_representer(data_type,multi_representer,Dumper=_A,representer=Representer):
if Dumper is _A:representer.add_multi_representer(data_type,multi_representer)
else:
if hasattr(Dumper,'add_multi_representer'):Dumper.add_multi_representer(data_type,multi_representer);return
if issubclass(Dumper,BaseDumper):BaseRepresenter.add_multi_representer(data_type,multi_representer)
elif issubclass(Dumper,SafeDumper):SafeRepresenter.add_multi_representer(data_type,multi_representer)
elif issubclass(Dumper,Dumper):Representer.add_multi_representer(data_type,multi_representer)
elif issubclass(Dumper,RoundTripDumper):RoundTripRepresenter.add_multi_representer(data_type,multi_representer)
else:raise NotImplementedError
class YAMLObjectMetaclass(type):
def __init__(cls,name,bases,kwds):
super(YAMLObjectMetaclass,cls).__init__(name,bases,kwds)
if _G in kwds and kwds[_G]is not _A:cls.yaml_constructor.add_constructor(cls.yaml_tag,cls.from_yaml);cls.yaml_representer.add_representer(cls,cls.to_yaml)
class YAMLObject(with_metaclass(YAMLObjectMetaclass)):
__slots__=();yaml_constructor=Constructor;yaml_representer=Representer;yaml_tag=_A;yaml_flow_style=_A
@classmethod
def from_yaml(cls,constructor,node):return constructor.construct_yaml_object(node,cls)
@classmethod
def to_yaml(cls,representer,data):return representer.represent_yaml_object(cls.yaml_tag,data,cls,flow_style=cls.yaml_flow_style)
|