File: test_np.py

package info (click to toggle)
json-tricks 3.17.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 372 kB
  • sloc: python: 2,319; makefile: 159
file content (383 lines) | stat: -rw-r--r-- 14,471 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from copy import deepcopy
from os.path import join
from tempfile import mkdtemp
import sys
from warnings import catch_warnings, simplefilter

from pytest import warns
from numpy import arange, ones, array, array_equal, finfo, iinfo, pi
from numpy import int8, int16, int32, int64, uint8, uint16, uint32, uint64, \
	float16, float32, float64, complex64, complex128, zeros, ndindex
from numpy.core.umath import exp
from numpy.testing import assert_equal

from json_tricks import numpy_encode
from json_tricks.np import dump, dumps, load, loads
from json_tricks.np_utils import encode_scalars_inplace
from json_tricks.utils import JsonTricksDeprecation, gzip_decompress
from .test_bare import cls_instance
from .test_class import MyTestCls

DTYPES = (int8, int16, int32, int64, uint8, uint16, uint32, uint64,
	float16, float32, float64, complex64, complex128)


def get_lims(dtype):
	try:
		info = finfo(dtype)
	except ValueError:
		info = iinfo(dtype)
	return dtype(info.min), dtype(info.max)


npdata = {
	'vector': arange(15, 70, 3, dtype=uint8),
	'matrix': ones((15, 10), dtype=float64),
}


def _numpy_equality(d2):
	assert npdata.keys() == d2.keys()
	assert_equal(npdata['vector'], d2['vector'])
	assert_equal(npdata['matrix'], d2['matrix'])
	assert npdata['vector'].dtype == d2['vector'].dtype
	assert npdata['matrix'].dtype == d2['matrix'].dtype


def test_primitives():
	txt = dumps(deepcopy(npdata), primitives=True)
	data2 = loads(txt)
	assert isinstance(data2['vector'], list)
	assert isinstance(data2['matrix'], list)
	assert isinstance(data2['matrix'][0], list)
	assert data2['vector'] == npdata['vector'].tolist()
	assert (abs(array(data2['vector']) - npdata['vector'])).sum() < 1e-10
	assert data2['matrix'] == npdata['matrix'].tolist()
	assert (abs(array(data2['matrix']) - npdata['matrix'])).sum() < 1e-10


def test_dumps_loads_numpy():
	json = dumps(deepcopy(npdata))
	data2 = loads(json)
	_numpy_equality(data2)


def test_file_numpy():
	path = join(mkdtemp(), 'pytest-np.json')
	with open(path, 'wb+') as fh:
		dump(deepcopy(npdata), fh, compression=9)
	with open(path, 'rb') as fh:
		data2 = load(fh, decompression=True)
	_numpy_equality(data2)


def test_compressed_to_disk():
	arr = [array([[1.0, 2.0], [3.0, 4.0]])]
	path = join(mkdtemp(), 'pytest-np.json.gz')
	with open(path, 'wb+') as fh:
		dump(arr, fh, compression=True, properties=dict(ndarray_compact=True, ndarray_store_byteorder='little'))


mixed_data = {
	'vec': array(range(10)),
	'inst': MyTestCls(
		nr=7, txt='yolo',
		li=[1, 1, 2, 3, 5, 8, 12],
		vec=array(range(7, 16, 2)),
		inst=cls_instance
	),
}


def test_mixed_cls_arr():
	json = dumps(mixed_data)
	back = dict(loads(json))
	assert mixed_data.keys() == back.keys()
	assert (mixed_data['vec'] == back['vec']).all()
	assert (mixed_data['inst'].vec == back['inst'].vec).all()
	assert (mixed_data['inst'].nr == back['inst'].nr)
	assert (mixed_data['inst'].li == back['inst'].li)
	assert (mixed_data['inst'].inst.s == back['inst'].inst.s)
	assert (mixed_data['inst'].inst.dct == dict(back['inst'].inst.dct))


def test_memory_order():
	arrC = array([[1., 2.], [3., 4.]], order='C')
	json = dumps(arrC)
	arr = loads(json)
	assert array_equal(arrC, arr)
	assert arrC.flags['C_CONTIGUOUS'] == arr.flags['C_CONTIGUOUS'] and \
		arrC.flags['F_CONTIGUOUS'] == arr.flags['F_CONTIGUOUS']
	arrF = array([[1., 2.], [3., 4.]], order='F')
	json = dumps(arrF)
	arr = loads(json)
	assert array_equal(arrF, arr)
	assert arrF.flags['C_CONTIGUOUS'] == arr.flags['C_CONTIGUOUS'] and \
		arrF.flags['F_CONTIGUOUS'] == arr.flags['F_CONTIGUOUS']


def test_scalars_types():
	# from: https://docs.scipy.org/doc/numpy/user/basics.types.html
	encme = []
	for dtype in DTYPES:
		for val in (dtype(0),) + get_lims(dtype):
			assert isinstance(val, dtype)
			encme.append(val)
	json = dumps(encme, indent=2)
	rec = loads(json)
	assert encme == rec


def test_array_types():
	# from: https://docs.scipy.org/doc/numpy/user/basics.types.html
	# see also `test_scalars_types`
	for dtype in DTYPES:
		vec = [array((dtype(0), dtype(exp(1))) + get_lims(dtype), dtype=dtype)]
		json = dumps(vec)
		assert dtype.__name__ in json
		rec = loads(json)
		assert rec[0].dtype == dtype
		assert array_equal(vec, rec)


def test_encode_scalar():
	encd = encode_scalars_inplace([complex128(1+2j)])
	assert isinstance(encd[0], dict)
	assert encd[0]['__ndarray__'] == 1+2j
	assert encd[0]['shape'] == ()
	assert encd[0]['dtype'] == complex128.__name__


def test_dump_np_scalars():
	data = [
		int8(-27),
		complex64(exp(1)+37j),
		(
			{
				'alpha': float64(-exp(10)),
				'str-only': complex64(-1-1j),
			},
			uint32(123456789),
			float16(exp(-1)),
			set((
				int64(37),
				uint64(-0),
			)),
		),
	]
	replaced = encode_scalars_inplace(deepcopy(data))
	json = dumps(replaced)
	rec = loads(json)
	assert data[0] == rec[0]
	assert data[1] == rec[1]
	assert data[2][0] == rec[2][0]
	assert data[2][1] == rec[2][1]
	assert data[2][2] == rec[2][2]
	assert data[2][3] == rec[2][3]
	assert data[2] == tuple(rec[2])


def test_ndarray_object_nesting():
	# Based on issue 53
	# With nested ndarrays
	before = zeros((2, 2,), dtype=object)
	for i in ndindex(before.shape):
		before[i] = array([1, 2, 3])
	after = loads(dumps(before))
	assert before.shape == after.shape, \
		'shape of array changed for nested ndarrays:\n{}'.format(dumps(before, indent=2))
	assert before.dtype == before.dtype
	assert array_equal(before[0, 0], after[0, 0])
	# With nested lists
	before = zeros((2, 2,), dtype=object)
	for i in ndindex(before.shape):
		before[i] = [1, 2, 3]
	after = loads(dumps(before))
	assert before.shape == after.shape, \
		'shape of array changed for nested ndarrays:\n{}'.format(dumps(before, indent=2))
	assert before.dtype == before.dtype
	assert array_equal(before[0, 0], after[0, 0])


def test_dtype_object():
	# Based on issue 64
	arr = array(['a', 'b', 'c'], dtype=object)
	json = dumps(arr)
	back = loads(json)
	assert array_equal(back, arr)


def test_compact_mode_unspecified():
	# Other tests may have raised deprecation warning, so reset the cache here
	numpy_encode._warned_compact = False
	data = [array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]), array([pi, exp(1)])]
	with warns(JsonTricksDeprecation):
		gz_json_1 = dumps(data, compression=True)
	with catch_warnings():
		simplefilter("error")
		gz_json_2 = dumps(data, compression=True)
	assert gz_json_1 == gz_json_2
	json = gzip_decompress(gz_json_1).decode('ascii')
	assert json == '[{"__ndarray__": [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]], "dtype": "float64", "shape": [2, 4], "Corder": true}, ' \
		'{"__ndarray__": [3.141592653589793, 2.718281828459045], "dtype": "float64", "shape": [2]}]'


def test_compact():
	data = [array(list(2**(x + 0.5) for x in range(-30, +31)))]
	json = dumps(data, compression=True, properties=dict(ndarray_compact=True, ndarray_store_byteorder='little'))
	back = loads(json)
	assert_equal(data, back)


def test_encode_disable_compact():
	data = [array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]), array([pi, exp(1)])]
	gz_json = dumps(data, compression=True, properties=dict(ndarray_compact=False))
	json = gzip_decompress(gz_json).decode('ascii')
	assert json == '[{"__ndarray__": [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]], "dtype": "float64", "shape": [2, 4], "Corder": true}, ' \
		'{"__ndarray__": [3.141592653589793, 2.718281828459045], "dtype": "float64", "shape": [2]}]'


def test_encode_enable_compact_little_endian():
	data = [array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]), array([pi, exp(1)])]
	gz_json = dumps(data, compression=True, properties=dict(ndarray_compact=True, ndarray_store_byteorder='little'))
	json = gzip_decompress(gz_json).decode('ascii')
	assert json == '[{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAA' \
		'UQAAAAAAAABhAAAAAAAAAHEAAAAAAAAAgQA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
		'true, "endian": "little"}, {"__ndarray__": "b64:GC1EVPshCUBpVxSLCr8FQA==", "dtype": "float64", ' \
		'"shape": [2], "endian": "little"}]'


def test_encode_enable_compact_big_endian():
	data = array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]])
	gz_json = dumps(data, compression=True, properties=dict(ndarray_compact=True, ndarray_store_byteorder='big'))
	json = gzip_decompress(gz_json).decode('ascii')
	assert json == '{"__ndarray__": "b64:P/AAAAAAAABAAAAAAAAAAEAIAAAAAAAAQBAAAAAAAABAFAAAAAAAAEAYAA' \
		'AAAAAAQBwAAAAAAABAIAAAAAAAAA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
		'true, "endian": "big"}'


def test_encode_enable_compact_native_endian():
	data = array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]])
	gz_json = dumps(data, compression=True, properties=dict(ndarray_compact=True))
	json = gzip_decompress(gz_json).decode('ascii')
	if sys.byteorder == 'little':
		assert json == '{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAA' \
			'UQAAAAAAAABhAAAAAAAAAHEAAAAAAAAAgQA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
			'true, "endian": "little"}'
	elif sys.byteorder == 'big':
		assert json == '{"__ndarray__": "b64:P/AAAAAAAABAAAAAAAAAAEAIAAAAAAAAQBAAAAAAAABAFAAAAAAAAEAYAA' \
			'AAAAAAQBwAAAAAAABAIAAAAAAAAA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
			'true, "endian": "big"}'
	else:
		raise Exception("unknown system endianness '{}'".format(sys.byteorder))


def test_encode_enable_compact_suppress_endianness():
	data = array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]])
	gz_json = dumps(data, compression=True, properties=dict(ndarray_compact=True, ndarray_store_byteorder='suppress'))
	json = gzip_decompress(gz_json).decode('ascii')
	assert "endian" not in json


def test_encode_compact_cutoff():
	data = [array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]), array([pi, exp(1)])]
	gz_json = dumps(data, compression=True, properties=dict(ndarray_compact=5, ndarray_store_byteorder='little'))
	json = gzip_decompress(gz_json).decode('ascii')
	assert json == '[{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAA' \
		'UQAAAAAAAABhAAAAAAAAAHEAAAAAAAAAgQA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
		'true, "endian": "little"}, {"__ndarray__": [3.141592653589793, 2.718281828459045], "dtype": "float64", ' \
		'"shape": [2]}]'


def test_encode_compact_inline_compression():
	data = [array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0], [9.0, 10.0, 11.0, 12.0], [13.0, 14.0, 15.0, 16.0]])]
	json = dumps(data, compression=False, properties=dict(ndarray_compact=True, ndarray_store_byteorder='little'))
	assert 'b64.gz:' in json, 'If the overall file is not compressed and there are significant savings, then do inline gzip compression.'
	assert json == '[{"__ndarray__": "b64.gz:H4sIAAAAAAAC/2NgAIEP9gwQ4AChOKC0AJQWgdISUFoGSitAaSUorQKl1aC0BpTWgtI6UFoPShs4AABmfqWAgAAAAA==", ' \
		'"dtype": "float64", "shape": [4, 4], "Corder": true, "endian": "little"}]'


def test_encode_compact_no_inline_compression():
	data = [array([[1.0, 2.0], [3.0, 4.0]])]
	json = dumps(data, compression=False, properties=dict(ndarray_compact=True, ndarray_store_byteorder='little'))
	assert 'b64.gz:' not in json, 'If the overall file is not compressed, but there are no significant savings, then do not do inline compression.'
	assert json == '[{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEA=", ' \
		'"dtype": "float64", "shape": [2, 2], "Corder": true, "endian": "little"}]'


def test_decode_compact_mixed_compactness():
	json = '[{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAA' \
		'UQAAAAAAAABhAAAAAAAAAHEAAAAAAAAAgQA==", "dtype": "float64", "shape": [2, 4], "endian": "little", "Corder": ' \
		'true}, {"__ndarray__": [3.141592653589793, 2.718281828459045], "dtype": "float64", "shape": [2]}]'
	data = loads(json)
	assert_equal(data[0], array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]), array([pi, exp(1)]))


def test_decode_big_endian():
	json = '{"__ndarray__": "b64:P/AAAAAAAABAAAAAAAAAAEAIAAAAAAAAQBAAAAAAAABAFAAAAAAAAEAYAA' \
		   'AAAAAAQBwAAAAAAABAIAAAAAAAAA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
		   'true, "endian": "big"}'
	data = loads(json)
	assert_equal(data, array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]))


def test_decode_little_endian():
	json = '{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAA' \
		'UQAAAAAAAABhAAAAAAAAAHEAAAAAAAAAgQA==", "dtype": "float64", "shape": [2, 4], "Corder": ' \
		'true, "endian": "little"}'
	data = loads(json)
	assert_equal(data, array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]))


def test_decode_without_endianness():
	json = '[{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEAAAAAAAAA' \
		'UQAAAAAAAABhAAAAAAAAAHEAAAAAAAAAgQA==", "dtype": "float64", "shape": [2, 4], "Corder": true}]'
	data = loads(json)
	if sys.byteorder == 'big':
		import pytest
		pytest.skip('skip for big endian systems')
	assert_equal(data[0], array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]]))


def test_decode_compact_inline_compression():
	json = '[{"__ndarray__": "b64.gz:H4sIAAAAAAAC/2NgAIEP9gwQ4AChOKC0AJQWgdISUFoGSitAaSUorQKl1aC0BpTWgtI6UFoPShs4AABmfqWAgAAAAA==", "dtype": "float64", "shape": [4, 4], "Corder": true, "endian": "little"}]'
	data = loads(json)
	assert_equal(data[0], array([[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0], [9.0, 10.0, 11.0, 12.0], [13.0, 14.0, 15.0, 16.0]]))


def test_decode_compact_no_inline_compression():
	json = '[{"__ndarray__": "b64:AAAAAAAA8D8AAAAAAAAAQAAAAAAAAAhAAAAAAAAAEEA=", ' \
		'"dtype": "float64", "shape": [2, 2], "Corder": true, "endian": "little"}]'
	data = loads(json)
	assert_equal(data[0], array([[1.0, 2.0], [3.0, 4.0]]))


def test_empty():
	# issue https://github.com/mverleg/pyjson_tricks/issues/76
	datas = [
		zeros(shape=(1, 0)),
		zeros(shape=(0, 1)),
		zeros(shape=(0, 0)),
	]
	for data in datas:
		json = dumps(data)
		assert_equal(loads(json), data, 'shape = {} ; json = {}'.format(data.shape, json))

def test_decode_writeable():
    # issue https://github.com/mverleg/pyjson_tricks/issues/90
    data = zeros((2, 2))

    data_uncompressed = dumps(data)
    data_compressed = dumps(data, properties={'ndarray_compact': True})

    reloaded_uncompressed = loads(data_uncompressed)
    reloaded_compressed = loads(data_compressed)

    assert array_equal(data, reloaded_uncompressed)
    assert array_equal(data, reloaded_compressed)

    assert reloaded_uncompressed.flags.writeable
    assert reloaded_compressed.flags.writeable