File: __init__.py

package info (click to toggle)
python-pretty-yaml 25.7.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 204 kB
  • sloc: python: 1,100; makefile: 3
file content (295 lines) | stat: -rw-r--r-- 12,919 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import os, sys, io, re, string, warnings, enum, pathlib, collections as cs

import yaml


PYAMLSort = enum.Enum('PYAMLSort', 'none keys oneline_group')

class PYAMLDumper(yaml.dumper.SafeDumper):

	class str_ext(str): __slots__ = 'ext',
	pyaml_anchor_decode = None # imported from unidecode module when needed
	pyaml_sort_dicts = pyaml_repr_unknown = None

	def __init__( self, *args, sort_dicts=None, force_embed=True,
			string_val_style=None, anchor_len_max=40, repr_unknown=False, **kws ):
		self.pyaml_force_embed = force_embed
		self.pyaml_string_val_style = string_val_style
		self.pyaml_anchor_len_max = anchor_len_max
		self.pyaml_repr_unknown = repr_unknown
		if isinstance(sort_dicts, PYAMLSort):
			if sort_dicts is sort_dicts.none: kws['sort_keys'] = False
			elif sort_dicts is sort_dicts.keys: kws['sort_keys'] = True
			else: self.pyaml_sort_dicts, kws['sort_keys'] = sort_dicts, False
		elif sort_dicts is not None: kws['sort_keys'] = sort_dicts # for compatibility
		return super().__init__(*args, **kws)

	@staticmethod
	def pyaml_transliterate(s):
		if unidecode_missing := not all(ord(c) < 128 for c in s):
			if (unidecode := PYAMLDumper.pyaml_anchor_decode) is None:
				try: from unidecode import unidecode
				except ImportError: unidecode = False
				PYAMLDumper.pyaml_anchor_decode = unidecode
			if unidecode: unidecode_missing, s = None, unidecode(s)
		return re.sub(r'[^-_a-z0-9]+', '_', s.lower()), unidecode_missing

	def anchor_node(self, node, hints=list()):
		if node in self.anchors:
			if self.anchors[node] is None and not self.pyaml_force_embed:
				if hints:
					nid, uc = self.pyaml_transliterate('_-_'.join(h.value for h in hints))
					if len(nid) > (n := self.pyaml_anchor_len_max - 9) + 9:
						nid = f'{nid[:n//2]}-_-{nid[-n//2:]}_{self.generate_anchor(node)}'
					elif uc is True: nid = f'{nid}_{self.generate_anchor(node)}'
				else: nid = self.generate_anchor(node)
				self.anchors[node] = nid
		else:
			self.anchors[node] = None
			if isinstance(node, yaml.nodes.SequenceNode):
				for item in node.value: self.anchor_node(item)
			elif isinstance(node, yaml.nodes.MappingNode):
				for key, value in node.value:
					self.anchor_node(key)
					self.anchor_node(value, hints=hints+[key])

	def serialize_node(self, node, parent, index):
		if self.pyaml_force_embed: self.anchors[node] = self.serialized_nodes.clear()
		return super().serialize_node(node, parent, index)

	def expect_block_sequence(self):
		self.increase_indent(flow=False, indentless=False)
		self.state = self.expect_first_block_sequence_item

	def expect_block_sequence_item(self, first=False):
		if not first and isinstance(self.event, yaml.events.SequenceEndEvent):
			self.indent = self.indents.pop()
			self.state = self.states.pop()
		else:
			self.write_indent()
			self.write_indicator('-', True, indention=True)
			self.states.append(self.expect_block_sequence_item)
			self.expect_node(sequence=True)

	def check_simple_key(self):
		res = super().check_simple_key()
		if self.analysis: self.analysis.allow_flow_plain = False
		return res

	def choose_scalar_style(self, _re1=re.compile(r':(\s|$)')):
		if self.states[-1] == self.expect_block_mapping_simple_value:
			# Mapping keys - disable overriding string style, strip comments
			if self.pyaml_string_val_style: self.event.style = 'plain'
			if isinstance(self.analysis.scalar, self.str_ext):
				self.analysis.scalar = str(self.event.value)
		# Do default thing for complicated stuff
		if self.event.style != 'plain': return super().choose_scalar_style()
		# Make sure style isn't overidden for strings like list/mapping items
		if (s := self.event.value).startswith('- ') or _re1.search(s): return "'"
		# Returned style=None picks write_plain in Emitter.process_scalar

	def write_indicator(self, indicator, *args, **kws):
		if indicator == '...': return # presumably it's useful somewhere, but don't care
		super().write_indicator(indicator, *args, **kws)

	def represent_str(self, data):
		if not (style := self.pyaml_string_val_style):
			if '\n' in data[:-1]:
				style = 'literal'
				for line in data.splitlines():
					if len(line) > self.best_width: break
				else: style = '|'
		return yaml.representer.ScalarNode('tag:yaml.org,2002:str', data, style=style)

	def represent_mapping_sort_oneline(self, kv):
		key, value = kv
		if not value or isinstance(value, (int, float)): v = 1
		elif isinstance(value, str) and '\n' not in value: v = 1
		else: v = 2
		if isinstance(key, (int, float)): k = 1
		elif isinstance(key, str): k = 2
		elif key is None: k = 4
		else: k, key = 3, f'{type(key)}\0{key}' # best-effort sort for all other types
		return v, k, key

	def represent_mapping(self, tag, mapping, *args, **kws):
		if self.pyaml_sort_dicts is PYAMLSort.oneline_group:
			try:
				mapping = dict(sorted( mapping.items(),
					key=self.represent_mapping_sort_oneline ))
			except TypeError: pass # for subtype comparison fails
		return super().represent_mapping(tag, mapping, *args, **kws)

	def represent_undefined(self, data):
		if isinstance(data, tuple) and hasattr(data, '_make') and hasattr(data, '_asdict'):
			return self.represent_dict(data._asdict()) # assuming namedtuple
		if isinstance(data, cs.abc.Mapping): return self.represent_dict(data) # dict-like
		if type(data).__class__.__module__ == 'enum':
			node = self.represent_data(data.value)
			node.value = self.str_ext(node.value)
			node.value.ext = f'# {data.__class__.__name__}.{data.name}'
			return node
		if hasattr(type(data), '__dataclass_fields__'):
			try: import dataclasses as dcs
			except ImportError: pass # can still be something else
			else: return self.represent_dict(dcs.asdict(data))
		try: # this is for numpy arrays, and the likes
			if not callable(getattr(data, 'tolist', None)): raise AttributeError
		except: pass # can raise other errors with custom types
		else: return self.represent_data(data.tolist())
		if self.pyaml_repr_unknown: # repr value as a short oneliner
			if isinstance(n := self.pyaml_repr_unknown, bool): n = 50
			if len(s := repr(data).replace('\n', '⏎')) > n + 10:
				if (m := re.search(r' at (0x[0-9a-f]+>)$', s)) and n > len(m[0]):
					s = s[:n-len(m[0])] + f' ~[{n:,d}/{len(s):,d}]~ ' + m[1]
				else: s = s[:n] + f' ...[{n:,d}/{len(s):,d}]'
			cls, node = data.__class__, self.represent_data(s)
			if (st := f'{cls.__module__}.{cls.__name__}') in s: st = 'value'
			node.value = (s := self.str_ext(s)); s.ext = f'# python {st}'; return node
		return super().represent_undefined(data) # will raise RepresenterError

	def write_ext(self, func, text, *args, **kws):
		# Emitter write-funcs extension to append comments to values
		if ext := getattr(text, 'ext', None):
			# Commented values are enums/class-reprs and such, which shouldn't be split
			if args: args = [False, *args[1:]]
			else: kws['split'] = False
		getattr(super(), f'write_{func}')(text, *args, **kws)
		if ext: super().write_plain(ext, split=False)
	write_folded = lambda s,v,*a,**kw: s.write_ext('folded', v, *a, **kw)
	write_literal = lambda s,v,*a,**kw: s.write_ext('literal', v, *a, **kw)
	write_single_quoted = lambda s,v,*a,**kw: s.write_ext('single_quoted', v, *a, **kw)
	write_double_quoted = lambda s,v,*a,**kw: s.write_ext('double_quoted', v, *a, **kw)
	write_plain = lambda s,v,split=True: s.write_ext('plain', v, split)


# Unsafe was a separate class in <23.x versions, left here for compatibility
UnsafePYAMLDumper = PYAMLDumper

add_representer = PYAMLDumper.add_representer

add_representer( bool,
	lambda s,o: s.represent_scalar('tag:yaml.org,2002:bool', ['no', 'yes'][o]) )
add_representer( type(None),
	lambda s,o: s.represent_scalar('tag:yaml.org,2002:null', '') )
add_representer(str, PYAMLDumper.represent_str)

add_representer(cs.defaultdict, PYAMLDumper.represent_dict)
add_representer(cs.OrderedDict, PYAMLDumper.represent_dict)
add_representer(set, PYAMLDumper.represent_list)
add_representer(type(pathlib.Path('')), lambda cls,o: cls.represent_data(str(o)))
add_representer(None, PYAMLDumper.represent_undefined)


def dump_add_vspacing( yaml_str,
		split_lines=40, split_count=2, oneline_group=False, oneline_split=False ):
	'''Add some newlines to separate overly long YAML lists/mappings.
		"long" means both >split_lines in length and has >split_count items.
		oneline_group - don't split consecutive oneliner list/map items.
		oneline_split - split long list/map consisting only of oneliner values.'''
	def _add_vspacing(lines):
		a = a_seq = ind_re = ind_re_sub = has_sub = None
		blocks, item_lines = list(), list()
		for n, line in enumerate(lines):
			if ind_re is None and (m := re.match(r'( *)([^# ].?)', line)):
				ind_re = re.compile(m[1] + r'\S')
				lines.append(f'{m[1]}.') # for last add_vspacing
			if ind_re_sub:
				if ind_re_sub.match(line): has_sub = True; continue
				if n - a > split_lines and (block := lines[a:n]):
					if a_seq: block.insert(0, lines[a-1].replace('- ', '  ', 1))
					blocks.append((a, n, _add_vspacing(block)[a_seq:]))
				ind_re_sub = None
			if ind_re.match(line): item_lines.append(n)
			if m := re.match(r'( *)(- )?\S.*:(\s|$)', line):
				a, a_seq, ind_re_sub = n+1, bool(m[2]), re.compile(m[1] + ' ')
		if ( split_items := len(lines) > split_lines and
				len(item_lines) > split_count and (oneline_split or has_sub) ):
			for n in item_lines:
				try:
					if ( oneline_group and ind_re
						and ind_re.match(lines[n-1].lstrip('\n'))
						and ind_re.match(lines[n+1].lstrip('\n')) ): continue
				except IndexError: continue
				lines[n] = f'\n{lines[n]}'
		for a, b, block in reversed(blocks): lines[a:b] = block
		if ind_re: lines.pop()
		if split_items: lines.append('')
		return lines
	yaml_str = '\n'.join(_add_vspacing(yaml_str.splitlines()))
	return re.sub(r'\n\n+', '\n\n', yaml_str.strip() + '\n')


def dump( data, dst=None, safe=None, force_embed=True, vspacing=True,
		string_val_style=None, sort_dicts=None, multiple_docs=False, width=100,
		repr_unknown=False, **pyyaml_kws ):
	'''Serialize data as pretty-YAML to specified dst file-like object,
		or return as str with dst=str (default) or encoded to bytes with dst=bytes.'''
	if safe is not None:
		cat = DeprecationWarning if not safe else UserWarning
		warnings.warn( 'pyaml module "safe" arg/keyword is ignored as implicit'
			' safe=maybe-true?, as of pyaml >= 23.x', category=cat, stacklevel=2 )
	if sort_dicts is not None and not isinstance(sort_dicts, PYAMLSort):
		warnings.warn( 'Using pyaml module sort_dicts as boolean is deprecated as of'
				' pyaml >= 23.x - translated to sort_keys PyYAML keyword, use that instead',
			DeprecationWarning, stacklevel=2 )
	if stream := pyyaml_kws.pop('stream', None):
		if dst is not None and stream is not dst:
			raise TypeError( 'Using different pyaml dst='
				' and pyyaml stream= options at the same time is not supported' )
		dst = stream
	elif dst is None: dst = str # old default

	buff = io.StringIO()
	Dumper = lambda *a,**kw: PYAMLDumper( *a, **kw,
		force_embed=force_embed, string_val_style=string_val_style,
		sort_dicts=sort_dicts, repr_unknown=repr_unknown )
	if not multiple_docs: data = [data]
	else: pyyaml_kws.setdefault('explicit_start', True)
	yaml.dump_all( data, buff, Dumper=Dumper, width=width,
		default_flow_style=False, allow_unicode=True, **pyyaml_kws )
	buff = buff.getvalue()

	if vspacing not in [None, False]:
		if vspacing is True: vspacing = dict()
		elif not isinstance(vspacing, dict):
			warnings.warn(
				'Unsupported pyaml "vspacing" parameter type:'
					f' [{vspacing.__class__.__name__}] {vspacing}\n'
				'As of pyaml >= 23.x it should be either True or keywords-dict'
				' for pyaml_add_vspacing, and any other values are ignored,'
				' enabling default vspacing behavior.', DeprecationWarning, stacklevel=2 )
			vspacing = dict()
		if sort_dicts is PYAMLSort.oneline_group: vspacing.setdefault('oneline_group', True)
		buff = dump_add_vspacing(buff, **vspacing)

	if dst is bytes: return buff.encode()
	elif dst is str: return buff
	else:
		try: dst.write(b'') # tests if dst is str- or bytestream
		except: dst.write(buff)
		else: dst.write(buff.encode())


# Simpler pyaml.dump() aliases

def dump_all(data, *dump_args, **dump_kws):
	'Alias to dump(list, multiple_docs=True) for API compatibility with pyyaml'
	return dump(data, *dump_args, multiple_docs=True, **dump_kws)

def dumps(data, **dump_kws):
	'Alias to dump() for API compatibility with stdlib conventions'
	return dump(data, **dump_kws)

def pprint(*data, **dump_kws):
	'Similar to how print() works, with any number of arguments and stdout-default'
	dst = dump_kws.pop('file', dump_kws.pop('dst', sys.stdout))
	if len(data) == 1: data, = data
	dump(data, dst=dst, **dump_kws)

def debug(*data, **dump_kws):
	'Same as pprint, but also repr-printing any non-yaml types'
	pprint(*data, repr_unknown=True, **dump_kws)

_p = lambda *a,_p=print,**kw: _p(*a, **kw, flush=True) # to use here for debug
p = print = pprint