File: common.py

package info (click to toggle)
numcodecs 0.16.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 976 kB
  • sloc: python: 4,266; makefile: 43
file content (289 lines) | stat: -rw-r--r-- 9,401 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
import array
import json as _json
import os
from glob import glob

import numpy as np
import pytest
from numpy.testing import assert_array_almost_equal, assert_array_equal

from numcodecs import *  # noqa: F403  # for eval to find names in repr tests
from numcodecs.compat import ensure_bytes, ensure_ndarray
from numcodecs.registry import get_codec

greetings = [
    '¡Hola mundo!',
    'Hej Världen!',
    'Servus Woid!',
    'Hei maailma!',
    'Xin chào thế giới',
    'Njatjeta Botë!',
    'Γεια σου κόσμε!',  # noqa: RUF001
    'こんにちは世界',
    '世界,你好!',  # noqa: RUF001
    'Helló, világ!',
    'Zdravo svete!',
    'เฮลโลเวิลด์',
]


def compare_arrays(arr, res, precision=None):
    # ensure numpy array with matching dtype
    res = ensure_ndarray(res).view(arr.dtype)

    # convert to correct shape
    if arr.flags.f_contiguous:
        order = 'F'
    else:
        order = 'C'
    res = res.reshape(arr.shape, order=order)

    # exact compare
    if precision is None:
        assert_array_equal(arr, res)

    # fuzzy compare
    else:
        assert_array_almost_equal(arr, res, decimal=precision)


def check_encode_decode(arr, codec, precision=None):
    # N.B., watch out here with blosc compressor, if the itemsize of
    # the source buffer is different then the results of encoding
    # (i.e., compression) may be different. Hence we *do not* require that
    # the results of encoding be identical for all possible inputs, rather
    # we just require that the results of the encode/decode round-trip can
    # be compared to the original array.

    # encoding should support any object exporting the buffer protocol

    # test encoding of numpy array
    enc = codec.encode(arr)
    dec = codec.decode(enc)
    compare_arrays(arr, dec, precision=precision)

    # test encoding of bytes
    buf = arr.tobytes(order='A')
    enc = codec.encode(buf)
    dec = codec.decode(enc)
    compare_arrays(arr, dec, precision=precision)

    # test encoding of bytearray
    buf = bytearray(arr.tobytes(order='A'))
    enc = codec.encode(buf)
    dec = codec.decode(enc)
    compare_arrays(arr, dec, precision=precision)

    # test encoding of array.array
    buf = array.array('b', arr.tobytes(order='A'))
    enc = codec.encode(buf)
    dec = codec.decode(enc)
    compare_arrays(arr, dec, precision=precision)

    # decoding should support any object exporting the buffer protocol,

    # setup
    enc_bytes = ensure_bytes(enc)

    # test decoding of raw bytes
    dec = codec.decode(enc_bytes)
    compare_arrays(arr, dec, precision=precision)

    # test decoding of bytearray
    dec = codec.decode(bytearray(enc_bytes))
    compare_arrays(arr, dec, precision=precision)

    # test decoding of array.array
    buf = array.array('b', enc_bytes)
    dec = codec.decode(buf)
    compare_arrays(arr, dec, precision=precision)

    # test decoding of numpy array
    buf = np.frombuffer(enc_bytes, dtype='u1')
    dec = codec.decode(buf)
    compare_arrays(arr, dec, precision=precision)

    # test decoding directly into numpy array
    out = np.empty_like(arr)
    codec.decode(enc_bytes, out=out)
    compare_arrays(arr, out, precision=precision)

    # test decoding directly into bytearray
    out = bytearray(arr.nbytes)
    codec.decode(enc_bytes, out=out)
    # noinspection PyTypeChecker
    compare_arrays(arr, out, precision=precision)


def assert_array_items_equal(res, arr):
    assert isinstance(res, np.ndarray)
    res = res.reshape(-1, order='A')
    arr = arr.reshape(-1, order='A')
    assert res.shape == arr.shape
    assert res.dtype == arr.dtype

    # numpy asserts don't compare object arrays
    # properly; assert that we have the same nans
    # and values
    arr = arr.ravel().tolist()
    res = res.ravel().tolist()
    for a, r in zip(arr, res, strict=True):
        if isinstance(a, np.ndarray):
            assert_array_equal(a, r)
        elif a != a:
            assert r != r
        else:
            assert a == r


def check_encode_decode_array(arr, codec):
    enc = codec.encode(arr)
    dec = codec.decode(enc)
    assert_array_items_equal(arr, dec)

    out = np.empty_like(arr)
    codec.decode(enc, out=out)
    assert_array_items_equal(arr, out)

    enc = codec.encode(arr)
    dec = codec.decode(ensure_ndarray(enc))
    assert_array_items_equal(arr, dec)


def check_encode_decode_array_to_bytes(arr, codec):
    enc = codec.encode(arr)
    dec = codec.decode(enc)
    assert_array_items_equal(arr, dec)

    out = np.empty_like(arr)
    codec.decode(enc, out=out)
    assert_array_items_equal(arr, out)


def check_config(codec):
    config = codec.get_config()
    # round-trip through JSON to check serialization
    config = _json.loads(_json.dumps(config))
    assert codec == get_codec(config)


def check_repr(stmt):
    # check repr matches instantiation statement
    codec = eval(stmt)
    actual = repr(codec)
    assert stmt == actual


def check_backwards_compatibility(codec_id, arrays, codecs, precision=None, prefix=None):
    # setup directory to hold data fixture
    if prefix:
        fixture_dir = os.path.join('fixture', codec_id, prefix)
    else:
        fixture_dir = os.path.join('fixture', codec_id)
    if not os.path.exists(fixture_dir):  # pragma: no cover
        os.makedirs(fixture_dir)

    # save fixture data
    for i, arr in enumerate(arrays):
        arr_fn = os.path.join(fixture_dir, f'array.{i:02d}.npy')
        if not os.path.exists(arr_fn):  # pragma: no cover
            np.save(arr_fn, arr)

    # load fixture data
    for arr_fn in glob(os.path.join(fixture_dir, 'array.*.npy')):
        # setup
        i = int(arr_fn.split('.')[-2])
        arr = np.load(arr_fn, allow_pickle=True)
        arr_bytes = arr.tobytes(order='A')
        if arr.flags.f_contiguous:
            order = 'F'
        else:
            order = 'C'

        for j, codec in enumerate(codecs):
            if codec is None:
                continue

            # setup a directory to hold encoded data
            codec_dir = os.path.join(fixture_dir, f'codec.{j:02d}')
            if not os.path.exists(codec_dir):  # pragma: no cover
                os.makedirs(codec_dir)

            # file with codec configuration information
            codec_fn = os.path.join(codec_dir, 'config.json')
            # one time save config
            if not os.path.exists(codec_fn):  # pragma: no cover
                with open(codec_fn, mode='w') as cf:
                    _json.dump(codec.get_config(), cf, sort_keys=True, indent=4)
            # load config and compare with expectation
            with open(codec_fn) as cf:
                config = _json.load(cf)
                assert codec == get_codec(config)

            enc_fn = os.path.join(codec_dir, f'encoded.{i:02d}.dat')

            # one time encode and save array
            if not os.path.exists(enc_fn):  # pragma: no cover
                enc = codec.encode(arr)
                enc = ensure_bytes(enc)
                with open(enc_fn, mode='wb') as ef:
                    ef.write(enc)

            # load and decode data
            with open(enc_fn, mode='rb') as ef:
                enc = ef.read()
                dec = codec.decode(enc)
                dec_arr = ensure_ndarray(dec).reshape(-1, order='A')
                dec_arr = dec_arr.view(dtype=arr.dtype).reshape(arr.shape, order=order)
                if precision and precision[j] is not None:
                    assert_array_almost_equal(arr, dec_arr, decimal=precision[j])
                elif arr.dtype == 'object':
                    assert_array_items_equal(arr, dec_arr)
                else:
                    assert_array_equal(arr, dec_arr)
                    assert arr_bytes == ensure_bytes(dec)

    if os.path.exists(fixture_dir):
        import shutil
        shutil.rmtree(fixture_dir)


def check_err_decode_object_buffer(compressor):
    # cannot decode directly into object array, leads to segfaults
    a = np.arange(10)
    enc = compressor.encode(a)
    out = np.empty(10, dtype=object)
    with pytest.raises(TypeError):
        compressor.decode(enc, out=out)


def check_err_encode_object_buffer(compressor):
    # compressors cannot encode object array
    a = np.array(['foo', 'bar', 'baz'], dtype=object)
    with pytest.raises(TypeError):
        compressor.encode(a)


def check_max_buffer_size(codec):
    for max_buffer_size in (4, 64, 1024):
        old_max_buffer_size = codec.max_buffer_size
        try:
            codec.max_buffer_size = max_buffer_size
            # Just up the max_buffer_size is fine.
            codec.encode(np.zeros(max_buffer_size - 1, dtype=np.int8))
            codec.encode(np.zeros(max_buffer_size, dtype=np.int8))

            buffers = [
                bytes(b"x" * (max_buffer_size + 1)),
                np.zeros(max_buffer_size + 1, dtype=np.int8),
                np.zeros(max_buffer_size + 2, dtype=np.int8),
                np.zeros(max_buffer_size, dtype=np.int16),
                np.zeros(max_buffer_size, dtype=np.int32),
            ]
            for buf in buffers:
                with pytest.raises(ValueError):
                    codec.encode(buf)
                with pytest.raises(ValueError):
                    codec.decode(buf)
        finally:
            codec.max_buffer_size = old_max_buffer_size