File: resample.py

package info (click to toggle)
python-xarray 2023.01.0-1.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 8,980 kB
  • sloc: python: 86,209; makefile: 232; sh: 47
file content (425 lines) | stat: -rw-r--r-- 14,708 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, Callable, Hashable, Iterable, Sequence

import numpy as np

from xarray.core._aggregations import (
    DataArrayResampleAggregations,
    DatasetResampleAggregations,
)
from xarray.core.groupby import DataArrayGroupByBase, DatasetGroupByBase, GroupBy
from xarray.core.types import Dims, InterpOptions, T_Xarray

if TYPE_CHECKING:
    from xarray.core.dataarray import DataArray
    from xarray.core.dataset import Dataset

RESAMPLE_DIM = "__resample_dim__"


class Resample(GroupBy[T_Xarray]):
    """An object that extends the `GroupBy` object with additional logic
    for handling specialized re-sampling operations.

    You should create a `Resample` object by using the `DataArray.resample` or
    `Dataset.resample` methods. The dimension along re-sampling

    See Also
    --------
    DataArray.resample
    Dataset.resample

    """

    def __init__(
        self,
        *args,
        dim: Hashable | None = None,
        resample_dim: Hashable | None = None,
        **kwargs,
    ) -> None:

        if dim == resample_dim:
            raise ValueError(
                f"Proxy resampling dimension ('{resample_dim}') "
                f"cannot have the same name as actual dimension ('{dim}')!"
            )
        self._dim = dim

        super().__init__(*args, **kwargs)

    def _flox_reduce(
        self,
        dim: Dims,
        keep_attrs: bool | None = None,
        **kwargs,
    ) -> T_Xarray:

        from xarray.core.dataarray import DataArray

        kwargs.setdefault("method", "cohorts")

        # now create a label DataArray since resample doesn't do that somehow
        repeats = []
        for slicer in self._group_indices:
            assert isinstance(slicer, slice)
            stop = (
                slicer.stop
                if slicer.stop is not None
                else self._obj.sizes[self._group_dim]
            )
            repeats.append(stop - slicer.start)
        labels = np.repeat(self._unique_coord.data, repeats)
        group = DataArray(labels, dims=(self._group_dim,), name=self._unique_coord.name)

        result = super()._flox_reduce(
            dim=dim, group=group, keep_attrs=keep_attrs, **kwargs
        )
        result = self._maybe_restore_empty_groups(result)
        result = result.rename({RESAMPLE_DIM: self._group_dim})
        return result

    def _drop_coords(self) -> T_Xarray:
        """Drop non-dimension coordinates along the resampled dimension."""
        obj = self._obj
        for k, v in obj.coords.items():
            if k != self._dim and self._dim in v.dims:
                obj = obj.drop_vars(k)
        return obj

    def pad(self, tolerance: float | Iterable[float] | None = None) -> T_Xarray:
        """Forward fill new values at up-sampled frequency.

        Parameters
        ----------
        tolerance : float | Iterable[float] | None, default: None
            Maximum distance between original and new labels to limit
            the up-sampling method.
            Up-sampled data with indices that satisfy the equation
            ``abs(index[indexer] - target) <= tolerance`` are filled by
            new values. Data with indices that are outside the given
            tolerance are filled with ``NaN`` s.

        Returns
        -------
        padded : DataArray or Dataset
        """
        obj = self._drop_coords()
        return obj.reindex(
            {self._dim: self._full_index}, method="pad", tolerance=tolerance
        )

    ffill = pad

    def backfill(self, tolerance: float | Iterable[float] | None = None) -> T_Xarray:
        """Backward fill new values at up-sampled frequency.

        Parameters
        ----------
        tolerance : float | Iterable[float] | None, default: None
            Maximum distance between original and new labels to limit
            the up-sampling method.
            Up-sampled data with indices that satisfy the equation
            ``abs(index[indexer] - target) <= tolerance`` are filled by
            new values. Data with indices that are outside the given
            tolerance are filled with ``NaN`` s.

        Returns
        -------
        backfilled : DataArray or Dataset
        """
        obj = self._drop_coords()
        return obj.reindex(
            {self._dim: self._full_index}, method="backfill", tolerance=tolerance
        )

    bfill = backfill

    def nearest(self, tolerance: float | Iterable[float] | None = None) -> T_Xarray:
        """Take new values from nearest original coordinate to up-sampled
        frequency coordinates.

        Parameters
        ----------
        tolerance : float | Iterable[float] | None, default: None
            Maximum distance between original and new labels to limit
            the up-sampling method.
            Up-sampled data with indices that satisfy the equation
            ``abs(index[indexer] - target) <= tolerance`` are filled by
            new values. Data with indices that are outside the given
            tolerance are filled with ``NaN`` s.

        Returns
        -------
        upsampled : DataArray or Dataset
        """
        obj = self._drop_coords()
        return obj.reindex(
            {self._dim: self._full_index}, method="nearest", tolerance=tolerance
        )

    def interpolate(self, kind: InterpOptions = "linear") -> T_Xarray:
        """Interpolate up-sampled data using the original data as knots.

        Parameters
        ----------
        kind : {"linear", "nearest", "zero", "slinear", \
                "quadratic", "cubic", "polynomial"}, default: "linear"
            The method used to interpolate. The method should be supported by
            the scipy interpolator:

            - ``interp1d``: {"linear", "nearest", "zero", "slinear",
              "quadratic", "cubic", "polynomial"}
            - ``interpn``: {"linear", "nearest"}

            If ``"polynomial"`` is passed, the ``order`` keyword argument must
            also be provided.

        Returns
        -------
        interpolated : DataArray or Dataset

        See Also
        --------
        DataArray.interp
        Dataset.interp
        scipy.interpolate.interp1d

        """
        return self._interpolate(kind=kind)

    def _interpolate(self, kind="linear") -> T_Xarray:
        """Apply scipy.interpolate.interp1d along resampling dimension."""
        obj = self._drop_coords()
        return obj.interp(
            coords={self._dim: self._full_index},
            assume_sorted=True,
            method=kind,
            kwargs={"bounds_error": False},
        )


# https://github.com/python/mypy/issues/9031
class DataArrayResample(Resample["DataArray"], DataArrayGroupByBase, DataArrayResampleAggregations):  # type: ignore[misc]
    """DataArrayGroupBy object specialized to time resampling operations over a
    specified dimension
    """

    def map(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...] = (),
        shortcut: bool | None = False,
        **kwargs: Any,
    ) -> DataArray:
        """Apply a function to each array in the group and concatenate them
        together into a new array.

        `func` is called like `func(ar, *args, **kwargs)` for each array `ar`
        in this group.

        Apply uses heuristics (like `pandas.GroupBy.apply`) to figure out how
        to stack together the array. The rule is:

        1. If the dimension along which the group coordinate is defined is
           still in the first grouped array after applying `func`, then stack
           over this dimension.
        2. Otherwise, stack over the new dimension given by name of this
           grouping (the argument to the `groupby` function).

        Parameters
        ----------
        func : callable
            Callable to apply to each array.
        shortcut : bool, optional
            Whether or not to shortcut evaluation under the assumptions that:

            (1) The action of `func` does not depend on any of the array
                metadata (attributes or coordinates) but only on the data and
                dimensions.
            (2) The action of `func` creates arrays with homogeneous metadata,
                that is, with the same dimensions and attributes.

            If these conditions are satisfied `shortcut` provides significant
            speedup. This should be the case for many common groupby operations
            (e.g., applying numpy ufuncs).
        args : tuple, optional
            Positional arguments passed on to `func`.
        **kwargs
            Used to call `func(ar, **kwargs)` for each array `ar`.

        Returns
        -------
        applied : DataArray
            The result of splitting, applying and combining this array.
        """
        # TODO: the argument order for Resample doesn't match that for its parent,
        # GroupBy
        combined = super().map(func, shortcut=shortcut, args=args, **kwargs)

        # If the aggregation function didn't drop the original resampling
        # dimension, then we need to do so before we can rename the proxy
        # dimension we used.
        if self._dim in combined.coords:
            combined = combined.drop_vars(self._dim)

        if RESAMPLE_DIM in combined.dims:
            combined = combined.rename({RESAMPLE_DIM: self._dim})

        return combined

    def apply(self, func, args=(), shortcut=None, **kwargs):
        """
        Backward compatible implementation of ``map``

        See Also
        --------
        DataArrayResample.map
        """
        warnings.warn(
            "Resample.apply may be deprecated in the future. Using Resample.map is encouraged",
            PendingDeprecationWarning,
            stacklevel=2,
        )
        return self.map(func=func, shortcut=shortcut, args=args, **kwargs)

    def asfreq(self) -> DataArray:
        """Return values of original object at the new up-sampling frequency;
        essentially a re-index with new times set to NaN.

        Returns
        -------
        resampled : DataArray
        """
        self._obj = self._drop_coords()
        return self.mean(None if self._dim is None else [self._dim])


# https://github.com/python/mypy/issues/9031
class DatasetResample(Resample["Dataset"], DatasetGroupByBase, DatasetResampleAggregations):  # type: ignore[misc]
    """DatasetGroupBy object specialized to resampling a specified dimension"""

    def map(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...] = (),
        shortcut: bool | None = None,
        **kwargs: Any,
    ) -> Dataset:
        """Apply a function over each Dataset in the groups generated for
        resampling and concatenate them together into a new Dataset.

        `func` is called like `func(ds, *args, **kwargs)` for each dataset `ds`
        in this group.

        Apply uses heuristics (like `pandas.GroupBy.apply`) to figure out how
        to stack together the datasets. The rule is:

        1. If the dimension along which the group coordinate is defined is
           still in the first grouped item after applying `func`, then stack
           over this dimension.
        2. Otherwise, stack over the new dimension given by name of this
           grouping (the argument to the `groupby` function).

        Parameters
        ----------
        func : callable
            Callable to apply to each sub-dataset.
        args : tuple, optional
            Positional arguments passed on to `func`.
        **kwargs
            Used to call `func(ds, **kwargs)` for each sub-dataset `ar`.

        Returns
        -------
        applied : Dataset
            The result of splitting, applying and combining this dataset.
        """
        # ignore shortcut if set (for now)
        applied = (func(ds, *args, **kwargs) for ds in self._iter_grouped())
        combined = self._combine(applied)

        # If the aggregation function didn't drop the original resampling
        # dimension, then we need to do so before we can rename the proxy
        # dimension we used.
        if self._dim in combined.coords:
            combined = combined.drop_vars(self._dim)

        if RESAMPLE_DIM in combined.dims:
            combined = combined.rename({RESAMPLE_DIM: self._dim})

        return combined

    def apply(self, func, args=(), shortcut=None, **kwargs):
        """
        Backward compatible implementation of ``map``

        See Also
        --------
        DataSetResample.map
        """

        warnings.warn(
            "Resample.apply may be deprecated in the future. Using Resample.map is encouraged",
            PendingDeprecationWarning,
            stacklevel=2,
        )
        return self.map(func=func, shortcut=shortcut, args=args, **kwargs)

    def reduce(
        self,
        func: Callable[..., Any],
        dim: Dims = None,
        *,
        axis: int | Sequence[int] | None = None,
        keep_attrs: bool | None = None,
        keepdims: bool = False,
        shortcut: bool = True,
        **kwargs: Any,
    ) -> Dataset:
        """Reduce the items in this group by applying `func` along the
        pre-defined resampling dimension.

        Parameters
        ----------
        func : callable
            Function which can be called in the form
            `func(x, axis=axis, **kwargs)` to return the result of collapsing
            an np.ndarray over an integer valued axis.
        dim : "...", str, Iterable of Hashable or None, optional
            Dimension(s) over which to apply `func`.
        keep_attrs : bool, optional
            If True, the datasets's attributes (`attrs`) will be copied from
            the original object to the new one.  If False (default), the new
            object will be returned without attributes.
        **kwargs : dict
            Additional keyword arguments passed on to `func`.

        Returns
        -------
        reduced : Dataset
            Array with summarized data and the indicated dimension(s)
            removed.
        """
        return super().reduce(
            func=func,
            dim=dim,
            axis=axis,
            keep_attrs=keep_attrs,
            keepdims=keepdims,
            shortcut=shortcut,
            **kwargs,
        )

    def asfreq(self) -> Dataset:
        """Return values of original object at the new up-sampling frequency;
        essentially a re-index with new times set to NaN.

        Returns
        -------
        resampled : Dataset
        """
        self._obj = self._drop_coords()
        return self.mean(None if self._dim is None else [self._dim])