1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
|
"""Coders for individual Variable objects."""
import warnings
from functools import partial
from typing import Any, Hashable
import numpy as np
import pandas as pd
from ..core import dtypes, duck_array_ops, indexing
from ..core.pycompat import is_duck_dask_array
from ..core.variable import Variable
class SerializationWarning(RuntimeWarning):
"""Warnings about encoding/decoding issues in serialization."""
class VariableCoder:
"""Base class for encoding and decoding transformations on variables.
We use coders for transforming variables between xarray's data model and
a format suitable for serialization. For example, coders apply CF
conventions for how data should be represented in netCDF files.
Subclasses should implement encode() and decode(), which should satisfy
the identity ``coder.decode(coder.encode(variable)) == variable``. If any
options are necessary, they should be implemented as arguments to the
__init__ method.
The optional name argument to encode() and decode() exists solely for the
sake of better error messages, and should correspond to the name of
variables in the underlying store.
"""
def encode(
self, variable: Variable, name: Hashable = None
) -> Variable: # pragma: no cover
"""Convert an encoded variable to a decoded variable"""
raise NotImplementedError()
def decode(
self, variable: Variable, name: Hashable = None
) -> Variable: # pragma: no cover
"""Convert an decoded variable to a encoded variable"""
raise NotImplementedError()
class _ElementwiseFunctionArray(indexing.ExplicitlyIndexedNDArrayMixin):
"""Lazily computed array holding values of elemwise-function.
Do not construct this object directly: call lazy_elemwise_func instead.
Values are computed upon indexing or coercion to a NumPy array.
"""
def __init__(self, array, func, dtype):
assert not is_duck_dask_array(array)
self.array = indexing.as_indexable(array)
self.func = func
self._dtype = dtype
@property
def dtype(self):
return np.dtype(self._dtype)
def __getitem__(self, key):
return type(self)(self.array[key], self.func, self.dtype)
def __array__(self, dtype=None):
return self.func(self.array)
def __repr__(self):
return "{}({!r}, func={!r}, dtype={!r})".format(
type(self).__name__, self.array, self.func, self.dtype
)
def lazy_elemwise_func(array, func, dtype):
"""Lazily apply an element-wise function to an array.
Parameters
----------
array : any valid value of Variable._data
func : callable
Function to apply to indexed slices of an array. For use with dask,
this should be a pickle-able object.
dtype : coercible to np.dtype
Dtype for the result of this function.
Returns
-------
Either a dask.array.Array or _ElementwiseFunctionArray.
"""
if is_duck_dask_array(array):
import dask.array as da
return da.map_blocks(func, array, dtype=dtype)
else:
return _ElementwiseFunctionArray(array, func, dtype)
def unpack_for_encoding(var):
return var.dims, var.data, var.attrs.copy(), var.encoding.copy()
def unpack_for_decoding(var):
return var.dims, var._data, var.attrs.copy(), var.encoding.copy()
def safe_setitem(dest, key, value, name=None):
if key in dest:
var_str = f" on variable {name!r}" if name else ""
raise ValueError(
"failed to prevent overwriting existing key {} in attrs{}. "
"This is probably an encoding field used by xarray to describe "
"how a variable is serialized. To proceed, remove this key from "
"the variable's attributes manually.".format(key, var_str)
)
dest[key] = value
def pop_to(source, dest, key, name=None):
"""
A convenience function which pops a key k from source to dest.
None values are not passed on. If k already exists in dest an
error is raised.
"""
value = source.pop(key, None)
if value is not None:
safe_setitem(dest, key, value, name=name)
return value
def _apply_mask(
data: np.ndarray, encoded_fill_values: list, decoded_fill_value: Any, dtype: Any
) -> np.ndarray:
"""Mask all matching values in a NumPy arrays."""
data = np.asarray(data, dtype=dtype)
condition = False
for fv in encoded_fill_values:
condition |= data == fv
return np.where(condition, decoded_fill_value, data)
class CFMaskCoder(VariableCoder):
"""Mask or unmask fill values according to CF conventions."""
def encode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_encoding(variable)
dtype = np.dtype(encoding.get("dtype", data.dtype))
fv = encoding.get("_FillValue")
mv = encoding.get("missing_value")
if (
fv is not None
and mv is not None
and not duck_array_ops.allclose_or_equiv(fv, mv)
):
raise ValueError(
f"Variable {name!r} has conflicting _FillValue ({fv}) and missing_value ({mv}). Cannot encode data."
)
if fv is not None:
# Ensure _FillValue is cast to same dtype as data's
encoding["_FillValue"] = dtype.type(fv)
fill_value = pop_to(encoding, attrs, "_FillValue", name=name)
if not pd.isnull(fill_value):
data = duck_array_ops.fillna(data, fill_value)
if mv is not None:
# Ensure missing_value is cast to same dtype as data's
encoding["missing_value"] = dtype.type(mv)
fill_value = pop_to(encoding, attrs, "missing_value", name=name)
if not pd.isnull(fill_value) and fv is None:
data = duck_array_ops.fillna(data, fill_value)
return Variable(dims, data, attrs, encoding)
def decode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_decoding(variable)
raw_fill_values = [
pop_to(attrs, encoding, attr, name=name)
for attr in ("missing_value", "_FillValue")
]
if raw_fill_values:
encoded_fill_values = {
fv
for option in raw_fill_values
for fv in np.ravel(option)
if not pd.isnull(fv)
}
if len(encoded_fill_values) > 1:
warnings.warn(
"variable {!r} has multiple fill values {}, "
"decoding all values to NaN.".format(name, encoded_fill_values),
SerializationWarning,
stacklevel=3,
)
dtype, decoded_fill_value = dtypes.maybe_promote(data.dtype)
if encoded_fill_values:
transform = partial(
_apply_mask,
encoded_fill_values=encoded_fill_values,
decoded_fill_value=decoded_fill_value,
dtype=dtype,
)
data = lazy_elemwise_func(data, transform, dtype)
return Variable(dims, data, attrs, encoding)
def _scale_offset_decoding(data, scale_factor, add_offset, dtype):
data = np.array(data, dtype=dtype, copy=True)
if scale_factor is not None:
data *= scale_factor
if add_offset is not None:
data += add_offset
return data
def _choose_float_dtype(dtype, has_offset):
"""Return a float dtype that can losslessly represent `dtype` values."""
# Keep float32 as-is. Upcast half-precision to single-precision,
# because float16 is "intended for storage but not computation"
if dtype.itemsize <= 4 and np.issubdtype(dtype, np.floating):
return np.float32
# float32 can exactly represent all integers up to 24 bits
if dtype.itemsize <= 2 and np.issubdtype(dtype, np.integer):
# A scale factor is entirely safe (vanishing into the mantissa),
# but a large integer offset could lead to loss of precision.
# Sensitivity analysis can be tricky, so we just use a float64
# if there's any offset at all - better unoptimised than wrong!
if not has_offset:
return np.float32
# For all other types and circumstances, we just use float64.
# (safe because eg. complex numbers are not supported in NetCDF)
return np.float64
class CFScaleOffsetCoder(VariableCoder):
"""Scale and offset variables according to CF conventions.
Follows the formula:
decode_values = encoded_values * scale_factor + add_offset
"""
def encode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_encoding(variable)
if "scale_factor" in encoding or "add_offset" in encoding:
dtype = _choose_float_dtype(data.dtype, "add_offset" in encoding)
data = data.astype(dtype=dtype, copy=True)
if "add_offset" in encoding:
data -= pop_to(encoding, attrs, "add_offset", name=name)
if "scale_factor" in encoding:
data /= pop_to(encoding, attrs, "scale_factor", name=name)
return Variable(dims, data, attrs, encoding)
def decode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_decoding(variable)
if "scale_factor" in attrs or "add_offset" in attrs:
scale_factor = pop_to(attrs, encoding, "scale_factor", name=name)
add_offset = pop_to(attrs, encoding, "add_offset", name=name)
dtype = _choose_float_dtype(data.dtype, "add_offset" in attrs)
if np.ndim(scale_factor) > 0:
scale_factor = scale_factor.item()
if np.ndim(add_offset) > 0:
add_offset = add_offset.item()
transform = partial(
_scale_offset_decoding,
scale_factor=scale_factor,
add_offset=add_offset,
dtype=dtype,
)
data = lazy_elemwise_func(data, transform, dtype)
return Variable(dims, data, attrs, encoding)
class UnsignedIntegerCoder(VariableCoder):
def encode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_encoding(variable)
# from netCDF best practices
# https://www.unidata.ucar.edu/software/netcdf/docs/BestPractices.html
# "_Unsigned = "true" to indicate that
# integer data should be treated as unsigned"
if encoding.get("_Unsigned", "false") == "true":
pop_to(encoding, attrs, "_Unsigned")
signed_dtype = np.dtype("i%s" % data.dtype.itemsize)
if "_FillValue" in attrs:
new_fill = signed_dtype.type(attrs["_FillValue"])
attrs["_FillValue"] = new_fill
data = duck_array_ops.around(data).astype(signed_dtype)
return Variable(dims, data, attrs, encoding)
def decode(self, variable, name=None):
dims, data, attrs, encoding = unpack_for_decoding(variable)
if "_Unsigned" in attrs:
unsigned = pop_to(attrs, encoding, "_Unsigned")
if data.dtype.kind == "i":
if unsigned == "true":
unsigned_dtype = np.dtype("u%s" % data.dtype.itemsize)
transform = partial(np.asarray, dtype=unsigned_dtype)
data = lazy_elemwise_func(data, transform, unsigned_dtype)
if "_FillValue" in attrs:
new_fill = unsigned_dtype.type(attrs["_FillValue"])
attrs["_FillValue"] = new_fill
else:
warnings.warn(
"variable %r has _Unsigned attribute but is not "
"of integer type. Ignoring attribute." % name,
SerializationWarning,
stacklevel=3,
)
return Variable(dims, data, attrs, encoding)
|