1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
from __future__ import annotations
from contextlib import suppress
import numpy as np
import pandas as pd
import pytest
import xarray as xr
from xarray.coding import variables
from xarray.conventions import decode_cf_variable, encode_cf_variable
from xarray.tests import assert_allclose, assert_equal, assert_identical, requires_dask
with suppress(ImportError):
import dask.array as da
def test_CFMaskCoder_decode() -> None:
original = xr.Variable(("x",), [0, -1, 1], {"_FillValue": -1})
expected = xr.Variable(("x",), [0, np.nan, 1])
coder = variables.CFMaskCoder()
encoded = coder.decode(original)
assert_identical(expected, encoded)
encoding_with_dtype = {
"dtype": np.dtype("float64"),
"_FillValue": np.float32(1e20),
"missing_value": np.float64(1e20),
}
encoding_without_dtype = {
"_FillValue": np.float32(1e20),
"missing_value": np.float64(1e20),
}
CFMASKCODER_ENCODE_DTYPE_CONFLICT_TESTS = {
"numeric-with-dtype": ([0.0, -1.0, 1.0], encoding_with_dtype),
"numeric-without-dtype": ([0.0, -1.0, 1.0], encoding_without_dtype),
"times-with-dtype": (pd.date_range("2000", periods=3), encoding_with_dtype),
}
@pytest.mark.parametrize(
("data", "encoding"),
CFMASKCODER_ENCODE_DTYPE_CONFLICT_TESTS.values(),
ids=list(CFMASKCODER_ENCODE_DTYPE_CONFLICT_TESTS.keys()),
)
def test_CFMaskCoder_encode_missing_fill_values_conflict(data, encoding) -> None:
original = xr.Variable(("x",), data, encoding=encoding)
encoded = encode_cf_variable(original)
assert encoded.dtype == encoded.attrs["missing_value"].dtype
assert encoded.dtype == encoded.attrs["_FillValue"].dtype
roundtripped = decode_cf_variable("foo", encoded)
assert_identical(roundtripped, original)
def test_CFMaskCoder_missing_value() -> None:
expected = xr.DataArray(
np.array([[26915, 27755, -9999, 27705], [25595, -9999, 28315, -9999]]),
dims=["npts", "ntimes"],
name="tmpk",
)
expected.attrs["missing_value"] = -9999
decoded = xr.decode_cf(expected.to_dataset())
encoded, _ = xr.conventions.cf_encoder(decoded.variables, decoded.attrs)
assert_equal(encoded["tmpk"], expected.variable)
decoded.tmpk.encoding["_FillValue"] = -9940
with pytest.raises(ValueError):
encoded, _ = xr.conventions.cf_encoder(decoded.variables, decoded.attrs)
@requires_dask
def test_CFMaskCoder_decode_dask() -> None:
original = xr.Variable(("x",), [0, -1, 1], {"_FillValue": -1}).chunk()
expected = xr.Variable(("x",), [0, np.nan, 1])
coder = variables.CFMaskCoder()
encoded = coder.decode(original)
assert isinstance(encoded.data, da.Array)
assert_identical(expected, encoded)
# TODO(shoyer): port other fill-value tests
# TODO(shoyer): parameterize when we have more coders
def test_coder_roundtrip() -> None:
original = xr.Variable(("x",), [0.0, np.nan, 1.0])
coder = variables.CFMaskCoder()
roundtripped = coder.decode(coder.encode(original))
assert_identical(original, roundtripped)
@pytest.mark.parametrize("dtype", ["u1", "u2", "i1", "i2", "f2", "f4"])
@pytest.mark.parametrize("dtype2", ["f4", "f8"])
def test_scaling_converts_to_float(dtype: str, dtype2: str) -> None:
dt = np.dtype(dtype2)
original = xr.Variable(
("x",), np.arange(10, dtype=dtype), encoding=dict(scale_factor=dt.type(10))
)
coder = variables.CFScaleOffsetCoder()
encoded = coder.encode(original)
assert encoded.dtype == dt
roundtripped = coder.decode(encoded)
assert_identical(original, roundtripped)
assert roundtripped.dtype == dt
@pytest.mark.parametrize("scale_factor", (10, [10]))
@pytest.mark.parametrize("add_offset", (0.1, [0.1]))
def test_scaling_offset_as_list(scale_factor, add_offset) -> None:
# test for #4631
encoding = dict(scale_factor=scale_factor, add_offset=add_offset)
original = xr.Variable(("x",), np.arange(10.0), encoding=encoding)
coder = variables.CFScaleOffsetCoder()
encoded = coder.encode(original)
roundtripped = coder.decode(encoded)
assert_allclose(original, roundtripped)
@pytest.mark.parametrize("bits", [1, 2, 4, 8])
def test_decode_unsigned_from_signed(bits) -> None:
unsigned_dtype = np.dtype(f"u{bits}")
signed_dtype = np.dtype(f"i{bits}")
original_values = np.array([np.iinfo(unsigned_dtype).max], dtype=unsigned_dtype)
encoded = xr.Variable(
("x",), original_values.astype(signed_dtype), attrs={"_Unsigned": "true"}
)
coder = variables.CFMaskCoder()
decoded = coder.decode(encoded)
assert decoded.dtype == unsigned_dtype
assert decoded.values == original_values
@pytest.mark.parametrize("bits", [1, 2, 4, 8])
def test_decode_signed_from_unsigned(bits) -> None:
unsigned_dtype = np.dtype(f"u{bits}")
signed_dtype = np.dtype(f"i{bits}")
original_values = np.array([-1], dtype=signed_dtype)
encoded = xr.Variable(
("x",), original_values.astype(unsigned_dtype), attrs={"_Unsigned": "false"}
)
coder = variables.CFMaskCoder()
decoded = coder.decode(encoded)
assert decoded.dtype == signed_dtype
assert decoded.values == original_values
|