File: ndarray.py

package info (click to toggle)
python-asdf 4.3.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 7,032 kB
  • sloc: python: 24,068; makefile: 123
file content (197 lines) | stat: -rw-r--r-- 7,673 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import numpy as np

from asdf.extension import Converter


class NDArrayConverter(Converter):
    tags = [
        "tag:stsci.edu:asdf/core/ndarray-1.0.0",
        "tag:stsci.edu:asdf/core/ndarray-1.1.0",
    ]
    types = [
        np.ndarray,  # we use the type here so the extension can find the sub-classes
        "numpy.ma.MaskedArray",  # in numpy 2.0
        "numpy.ma.core.MaskedArray",
        "asdf.tags.core.ndarray.NDArrayType",
        "asdf.tags.core.stream.Stream",
    ]

    def to_yaml_tree(self, obj, tag, ctx):
        import numpy as np
        from numpy import ma

        from asdf import config, util
        from asdf._block.options import Options
        from asdf.tags.core.ndarray import NDArrayType, numpy_array_to_list, numpy_dtype_to_asdf_datatype
        from asdf.tags.core.stream import Stream

        data = obj

        if isinstance(obj, Stream):
            # previously, stream never passed on data, we can do that here
            ctx._blocks.set_streamed_write_block(data._array, data)

            result = {}
            result["source"] = -1
            result["shape"] = ["*", *data._shape]
            result["datatype"] = data._datatype
            result["byteorder"] = data._byteorder
            if data._strides is not None:
                result["strides"] = data._strides
            return result

        # sort out block writing options
        if isinstance(obj, NDArrayType) and isinstance(obj._source, str):
            # this is an external block, if we have no other settings, keep it as external
            options = ctx._blocks.options.lookup_by_object(data)
            if options is None:
                options = Options("external")
        else:
            options = ctx._blocks.options.get_options(data)

        # The ndarray-1.0.0 schema does not permit 0 valued strides.
        # Perhaps we'll want to allow this someday, to efficiently
        # represent an array of all the same value.
        if any(stride == 0 for stride in data.strides):
            data = np.ascontiguousarray(data)

        # Use the base array if that option is set or if the option
        # is unset and the AsdfConfig default is set
        cfg = config.get_config()
        if options.save_base or (options.save_base is None and cfg.default_array_save_base):
            base = util.get_array_base(data)
        else:
            base = data

        # The view computations that follow assume that the base array
        # is contiguous.  If not, we need to make a copy to avoid
        # writing a nonsense view.
        if not base.flags.forc:
            data = np.ascontiguousarray(data)
            base = util.get_array_base(data)

        shape = data.shape

        # possibly override options based on config settings
        if cfg.all_array_storage is not None:
            options.storage_type = cfg.all_array_storage
        if cfg.all_array_compression != "input":
            options.compression = cfg.all_array_compression
            options.compression_kwargs = cfg.all_array_compression_kwargs
        inline_threshold = cfg.array_inline_threshold

        if inline_threshold is not None and options.storage_type in ("inline", "internal"):
            if data.size < inline_threshold:
                options.storage_type = "inline"
            else:
                options.storage_type = "internal"
        ctx._blocks.options.set_options(data, options)

        # Compute the offset relative to the base array and not the
        # block data, in case the block is compressed.
        offset = data.ctypes.data - base.ctypes.data

        strides = None if data.flags.c_contiguous else data.strides
        dtype, byteorder = numpy_dtype_to_asdf_datatype(
            data.dtype,
            include_byteorder=(options.storage_type != "inline"),
        )

        result = {}

        result["shape"] = list(shape)
        if options.storage_type == "streamed":
            result["shape"][0] = "*"

        if options.storage_type == "inline":
            listdata = numpy_array_to_list(data)
            result["data"] = listdata
            result["datatype"] = dtype

        else:
            result["shape"] = list(shape)
            if options.storage_type == "streamed":
                result["shape"][0] = "*"
                result["source"] = -1
                ctx._blocks.set_streamed_write_block(base, data)
            else:
                result["source"] = ctx._blocks.make_write_block(base, options, obj)
            result["datatype"] = dtype
            result["byteorder"] = byteorder

            if offset > 0:
                result["offset"] = offset

            if strides is not None:
                result["strides"] = list(strides)

        if isinstance(data, ma.MaskedArray):
            if options.storage_type == "inline":
                ctx._blocks._set_array_storage(data.mask, "inline")

            result["mask"] = data.mask

        return result

    def from_yaml_tree(self, node, tag, ctx):
        import sys
        import weakref

        from asdf.tags.core import NDArrayType
        from asdf.tags.core.ndarray import asdf_datatype_to_numpy_dtype

        if isinstance(node, list):
            instance = NDArrayType(node, None, None, None, None, None, None)
            ctx._blocks._set_array_storage(instance, "inline")
            if not ctx._blocks._lazy_load:
                return instance._make_array()
            return instance

        if isinstance(node, dict):
            shape = node.get("shape", None)
            if "source" in node and "data" in node:
                msg = "Both source and data may not be provided at the same time"
                raise ValueError(msg)
            if "source" in node:
                source = node["source"]
                byteorder = node["byteorder"]
            else:
                source = node["data"]
                byteorder = sys.byteorder
            dtype = asdf_datatype_to_numpy_dtype(node["datatype"], byteorder) if "datatype" in node else None
            offset = node.get("offset", 0)
            strides = node.get("strides", None)
            mask = node.get("mask", None)

            if isinstance(source, int):
                # internal block
                data_callback = ctx.get_block_data_callback(source)
                instance = NDArrayType(source, shape, dtype, offset, strides, "A", mask, data_callback)
            elif isinstance(source, str):
                # external
                def data_callback(_attr=None, _ref=weakref.ref(ctx._blocks)):
                    if _attr not in (None, "cached_data", "data"):
                        raise AttributeError(f"_attr {_attr} is not supported")
                    blks = _ref()
                    if blks is None:
                        msg = "Failed to resolve reference to AsdfFile to read external block"
                        raise OSError(msg)
                    array = blks._load_external(source)
                    blks._set_array_storage(array, "external")
                    return array

                instance = NDArrayType(source, shape, dtype, offset, strides, "A", mask, data_callback)
            else:
                # inline
                instance = NDArrayType(source, shape, dtype, offset, strides, "A", mask)
                ctx._blocks._set_array_storage(instance, "inline")

            if not ctx._blocks._lazy_load:
                return instance._make_array()
            return instance

        msg = "Invalid ndarray description."
        raise TypeError(msg)

    def to_info(self, obj):
        return {"shape": obj.shape, "dtype": obj.dtype}