File: decoders.py

package info (click to toggle)
json-tricks 3.17.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 372 kB
  • sloc: python: 2,319; makefile: 159
file content (375 lines) | stat: -rw-r--r-- 12,125 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import sys
import warnings
from base64 import standard_b64decode
from collections import OrderedDict
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from fractions import Fraction

from json_tricks import NoEnumException, NoPandasException, NoNumpyException
from .utils import ClassInstanceHookBase, nested_index, str_type, gzip_decompress, filtered_wrapper


class DuplicateJsonKeyException(Exception):
	""" Trying to load a json map which contains duplicate keys, but allow_duplicates is False """


class TricksPairHook(object):
	"""
	Hook that converts json maps to the appropriate python type (dict or OrderedDict)
	and then runs any number of hooks on the individual maps.
	"""
	def __init__(self, ordered=True, obj_pairs_hooks=None, allow_duplicates=True, properties=None):
		"""
		:param ordered: True if maps should retain their ordering.
		:param obj_pairs_hooks: An iterable of hooks to apply to elements.
		"""
		self.properties = properties or {}
		self.map_type = OrderedDict
		if not ordered:
			self.map_type = dict
		self.obj_pairs_hooks = []
		if obj_pairs_hooks:
			self.obj_pairs_hooks = list(filtered_wrapper(hook) for hook in obj_pairs_hooks)
		self.allow_duplicates = allow_duplicates

	def __call__(self, pairs):
		if not self.allow_duplicates:
			known = set()
			for key, value in pairs:
				if key in known:
					raise DuplicateJsonKeyException(('Trying to load a json map which contains a ' +
						'duplicate key "{0:}" (but allow_duplicates is False)').format(key))
				known.add(key)
		map = self.map_type(pairs)
		for hook in self.obj_pairs_hooks:
			map = hook(map, properties=self.properties)
		return map


def json_date_time_hook(dct):
	"""
	Return an encoded date, time, datetime or timedelta to it's python representation, including optional timezone.

	:param dct: (dict) json encoded date, time, datetime or timedelta
	:return: (date/time/datetime/timedelta obj) python representation of the above
	"""
	def get_tz(dct):
		if not 'tzinfo' in dct:
			return None
		try:
			import pytz
		except ImportError as err:
			raise ImportError(('Tried to load a json object which has a timezone-aware (date)time. '
				'However, `pytz` could not be imported, so the object could not be loaded. '
				'Error: {0:}').format(str(err)))
		return pytz.timezone(dct['tzinfo'])

	if not isinstance(dct, dict):
		return dct
	if '__date__' in dct:
		return date(year=dct.get('year', 0), month=dct.get('month', 0), day=dct.get('day', 0))
	elif '__time__' in dct:
		tzinfo = get_tz(dct)
		return time(hour=dct.get('hour', 0), minute=dct.get('minute', 0), second=dct.get('second', 0),
			microsecond=dct.get('microsecond', 0), tzinfo=tzinfo)
	elif '__datetime__' in dct:
		tzinfo = get_tz(dct)
		dt = datetime(year=dct.get('year', 0), month=dct.get('month', 0), day=dct.get('day', 0),
			hour=dct.get('hour', 0), minute=dct.get('minute', 0), second=dct.get('second', 0),
			microsecond=dct.get('microsecond', 0))
		if tzinfo is None:
			return dt
		return tzinfo.localize(dt, is_dst=dct.get('is_dst', None))
	elif '__timedelta__' in dct:
		return timedelta(days=dct.get('days', 0), seconds=dct.get('seconds', 0),
			microseconds=dct.get('microseconds', 0))
	return dct


def json_complex_hook(dct):
	"""
	Return an encoded complex number to Python complex type.

	:param dct: (dict) json encoded complex number (__complex__)
	:return: python complex number
	"""
	if not isinstance(dct, dict):
		return dct
	if not '__complex__' in dct:
		return dct
	parts = dct['__complex__']
	assert len(parts) == 2
	return parts[0] + parts[1] * 1j


def json_bytes_hook(dct):
	"""
	Return encoded bytes, either base64 or utf8, back to Python bytes.

	:param dct: any object, if it is a dict containing encoded bytes, they will be converted
	:return: python complex number
	"""
	if not isinstance(dct, dict):
		return dct
	if '__bytes_b64__' in dct:
		return standard_b64decode(dct['__bytes_b64__'])
	if '__bytes_utf8__' in dct:
		return dct['__bytes_utf8__'].encode('utf-8')
	return dct


def numeric_types_hook(dct):
	if not isinstance(dct, dict):
		return dct
	if '__decimal__' in dct:
		return Decimal(dct['__decimal__'])
	if '__fraction__' in dct:
		return Fraction(numerator=dct['numerator'], denominator=dct['denominator'])
	return dct


def noenum_hook(dct):
	if isinstance(dct, dict) and '__enum__' in dct:
		raise NoEnumException(('Trying to decode a map which appears to represent a enum '
			'data structure, but enum support is not enabled, perhaps it is not installed.'))
	return dct


def pathlib_hook(dct):
	if not isinstance(dct, dict):
		return dct
	if not '__pathlib__' in dct:
		return dct
	from pathlib import Path
	return Path(dct['__pathlib__'])


def nopathlib_hook(dct):
	if isinstance(dct, dict) and '__pathlib__' in dct:
		raise NoPathlib(('Trying to decode a map which appears to represent a '
						'pathlib.Path data structure, but pathlib support '
						'is not enabled.'))
	return dct

def slice_hook(dct):
	if not isinstance(dct, dict):
		return dct
	if not '__slice__' in dct:
		return dct
	return slice(dct['start'], dct['stop'], dct['step'])


class EnumInstanceHook(ClassInstanceHookBase):
	"""
	This hook tries to convert json encoded by enum_instance_encode back to it's original instance.
	It only works if the environment is the same, e.g. the enum is similarly importable and hasn't changed.
	"""
	def __call__(self, dct, properties=None):
		if not isinstance(dct, dict):
			return dct
		if '__enum__' not in dct:
			return dct
		cls_lookup_map = properties.get('cls_lookup_map', {})
		mod, name = dct['__enum__']['__enum_instance_type__']
		Cls = self.get_cls_from_instance_type(mod, name, cls_lookup_map=cls_lookup_map)
		return Cls[dct['__enum__']['name']]


class ClassInstanceHook(ClassInstanceHookBase):
	"""
	This hook tries to convert json encoded by class_instance_encoder back to it's original instance.
	It only works if the environment is the same, e.g. the class is similarly importable and hasn't changed.
	"""
	def __call__(self, dct, properties=None):
		if not isinstance(dct, dict):
			return dct
		if '__instance_type__' not in dct:
			return dct
		cls_lookup_map = properties.get('cls_lookup_map', {}) or {}
		mod, name = dct['__instance_type__']
		Cls = self.get_cls_from_instance_type(mod, name, cls_lookup_map=cls_lookup_map)
		try:
			obj = Cls.__new__(Cls)
		except TypeError:
			raise TypeError(('problem while decoding instance of "{0:s}"; this instance has a special '
				'__new__ method and can\'t be restored').format(name))
		if hasattr(obj, '__json_decode__'):
			properties = {}
			if 'slots' in dct:
				properties.update(dct['slots'])
			if 'attributes' in dct:
				properties.update(dct['attributes'])
			obj.__json_decode__(**properties)
		else:
			if 'slots' in dct:
				for slot,value in dct['slots'].items():
					setattr(obj, slot, value)
			if 'attributes' in dct:
				obj.__dict__ = dict(dct['attributes'])
		return obj


def json_set_hook(dct):
	"""
	Return an encoded set to it's python representation.
	"""
	if not isinstance(dct, dict):
		return dct
	if '__set__' not in dct:
		return dct
	return set((tuple(item) if isinstance(item, list) else item) for item in dct['__set__'])


def pandas_hook(dct):
	if not isinstance(dct, dict):
		return dct
	if '__pandas_dataframe__' not in dct and '__pandas_series__' not in dct:
		return dct
	if '__pandas_dataframe__' in dct:
		try:
			from pandas import DataFrame
		except ImportError:
			raise NoPandasException('Trying to decode a map which appears to repr esent a pandas data structure, but pandas appears not to be installed.')
		from numpy import dtype, array
		meta = dct.pop('__pandas_dataframe__')
		indx = dct.pop('index') if 'index' in dct else None
		dtypes = dict((colname, dtype(tp)) for colname, tp in zip(meta['column_order'], meta['types']))
		data = OrderedDict()
		for name, col in dct.items():
			data[name] = array(col, dtype=dtypes[name])
		return DataFrame(
			data=data,
			index=indx,
			columns=meta['column_order'],
			# mixed `dtypes` argument not supported, so use duct of numpy arrays
		)
	elif '__pandas_series__' in dct:
		from pandas import Series
		from numpy import dtype, array
		meta = dct.pop('__pandas_series__')
		indx = dct.pop('index') if 'index' in dct else None
		return Series(
			data=dct['data'],
			index=indx,
			name=meta['name'],
			dtype=dtype(meta['type']),
		)
	return dct	# impossible


def nopandas_hook(dct):
	if isinstance(dct, dict) and ('__pandas_dataframe__' in dct or '__pandas_series__' in dct):
		raise NoPandasException(('Trying to decode a map which appears to represent a pandas '
			'data structure, but pandas support is not enabled, perhaps it is not installed.'))
	return dct


def json_numpy_obj_hook(dct):
	"""
	Replace any numpy arrays previously encoded by `numpy_encode` to their proper
	shape, data type and data.

	:param dct: (dict) json encoded ndarray
	:return: (ndarray) if input was an encoded ndarray
	"""
	if not isinstance(dct, dict):
		return dct
	if not '__ndarray__' in dct:
		return dct
	try:
		import numpy
	except ImportError:
		raise NoNumpyException('Trying to decode a map which appears to represent a numpy '
			'array, but numpy appears not to be installed.')
	order = None
	if 'Corder' in dct:
		order = 'C' if dct['Corder'] else 'F'
	data_json = dct['__ndarray__']
	shape = tuple(dct['shape'])
	nptype = dct['dtype']
	if shape:
		if nptype == 'object':
			return _lists_of_obj_to_ndarray(data_json, order, shape, nptype)
		if isinstance(data_json, str_type):
			endianness = dct.get('endian', 'native')
			return _bin_str_to_ndarray(data_json, order, shape, nptype, endianness)
		else:
			return _lists_of_numbers_to_ndarray(data_json, order, shape, nptype)
	else:
		return _scalar_to_numpy(data_json, nptype)


def _bin_str_to_ndarray(data, order, shape, np_type_name, data_endianness):
	"""
	From base64 encoded, gzipped binary data to ndarray.
	"""
	from base64 import standard_b64decode
	from numpy import frombuffer, dtype

	assert order in [None, 'C'], 'specifying different memory order is not (yet) supported ' \
								 'for binary numpy format (got order = {})'.format(order)
	if data.startswith('b64.gz:'):
		data = standard_b64decode(data[7:])
		data = gzip_decompress(data)
	elif data.startswith('b64:'):
		data = standard_b64decode(data[4:])
	else:
		raise ValueError('found numpy array buffer, but did not understand header; supported: b64 or b64.gz')
	np_type = dtype(np_type_name)
	if data_endianness == sys.byteorder:
		pass
	if data_endianness == 'little':
		np_type = np_type.newbyteorder('<')
	elif data_endianness == 'big':
		np_type = np_type.newbyteorder('>')
	elif data_endianness != 'native':
		warnings.warn('array of shape {} has unknown endianness \'{}\''.format(shape, data_endianness))
	data = frombuffer(bytearray(data), dtype=np_type)
	return data.reshape(shape)


def _lists_of_numbers_to_ndarray(data, order, shape, dtype):
	"""
	From nested list of numbers to ndarray.
	"""
	from numpy import asarray
	arr = asarray(data, dtype=dtype, order=order)
	if 0 in shape:
		return arr.reshape(shape)
	if shape != arr.shape:
		warnings.warn('size mismatch decoding numpy array: expected {}, got {}'.format(shape, arr.shape))
	return arr


def _lists_of_obj_to_ndarray(data, order, shape, dtype):
	"""
	From nested list of objects (that aren't native numpy numbers) to ndarray.
	"""
	from numpy import empty, ndindex
	arr = empty(shape, dtype=dtype, order=order)
	dec_data = data
	for indx in ndindex(arr.shape):
		arr[indx] = nested_index(dec_data, indx)
	return arr


def _scalar_to_numpy(data, dtype):
	"""
	From scalar value to numpy type.
	"""
	import numpy as nptypes
	dtype = getattr(nptypes, dtype)
	return dtype(data)


def json_nonumpy_obj_hook(dct):
	"""
	This hook has no effect except to check if you're trying to decode numpy arrays without support, and give you a useful message.
	"""
	if isinstance(dct, dict) and '__ndarray__' in dct:
		raise NoNumpyException(('Trying to decode a map which appears to represent a numpy array, '
			'but numpy support is not enabled, perhaps it is not installed.'))
	return dct