File: resample.py

package info (click to toggle)
python-xarray 2025.08.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 11,796 kB
  • sloc: python: 115,416; makefile: 258; sh: 47
file content (521 lines) | stat: -rw-r--r-- 18,181 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
from __future__ import annotations

import warnings
from collections.abc import Callable, Hashable, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Literal

from xarray.core._aggregations import (
    DataArrayResampleAggregations,
    DatasetResampleAggregations,
)
from xarray.core.groupby import DataArrayGroupByBase, DatasetGroupByBase, GroupBy
from xarray.core.types import Dims, InterpOptions, T_Xarray

if TYPE_CHECKING:
    from xarray.core.dataarray import DataArray
    from xarray.core.dataset import Dataset
    from xarray.core.types import T_Chunks

from xarray.groupers import RESAMPLE_DIM


class Resample(GroupBy[T_Xarray]):
    """An object that extends the `GroupBy` object with additional logic
    for handling specialized re-sampling operations.

    You should create a `Resample` object by using the `DataArray.resample` or
    `Dataset.resample` methods. The dimension along re-sampling

    See Also
    --------
    DataArray.resample
    Dataset.resample

    """

    def __init__(
        self,
        *args,
        dim: Hashable | None = None,
        resample_dim: Hashable | None = None,
        **kwargs,
    ) -> None:
        if dim == resample_dim:
            raise ValueError(
                f"Proxy resampling dimension ('{resample_dim}') "
                f"cannot have the same name as actual dimension ('{dim}')!"
            )
        self._dim = dim

        super().__init__(*args, **kwargs)

    def _flox_reduce(
        self,
        dim: Dims,
        keep_attrs: bool | None = None,
        **kwargs,
    ) -> T_Xarray:
        result: T_Xarray = (
            super()
            ._flox_reduce(dim=dim, keep_attrs=keep_attrs, **kwargs)
            .rename({RESAMPLE_DIM: self._group_dim})  # type: ignore[assignment]
        )
        return result

    def shuffle_to_chunks(self, chunks: T_Chunks = None):
        """
        Sort or "shuffle" the underlying object.

        "Shuffle" means the object is sorted so that all group members occur sequentially,
        in the same chunk. Multiple groups may occur in the same chunk.
        This method is particularly useful for chunked arrays (e.g. dask, cubed).
        particularly when you need to map a function that requires all members of a group
        to be present in a single chunk. For chunked array types, the order of appearance
        is not guaranteed, but will depend on the input chunking.

        Parameters
        ----------
        chunks : int, tuple of int, "auto" or mapping of hashable to int or tuple of int, optional
            How to adjust chunks along dimensions not present in the array being grouped by.

        Returns
        -------
        DataArrayGroupBy or DatasetGroupBy

        Examples
        --------
        >>> import dask.array
        >>> da = xr.DataArray(
        ...     dims="time",
        ...     data=dask.array.arange(10, chunks=1),
        ...     coords={"time": xr.date_range("2001-01-01", freq="12h", periods=10)},
        ...     name="a",
        ... )
        >>> shuffled = da.resample(time="2D").shuffle_to_chunks()
        >>> shuffled
        <xarray.DataArray 'a' (time: 10)> Size: 80B
        dask.array<shuffle, shape=(10,), dtype=int64, chunksize=(4,), chunktype=numpy.ndarray>
        Coordinates:
          * time     (time) datetime64[ns] 80B 2001-01-01 ... 2001-01-05T12:00:00

        See Also
        --------
        dask.dataframe.DataFrame.shuffle
        dask.array.shuffle
        """
        (grouper,) = self.groupers
        return self._shuffle_obj(chunks).drop_vars(RESAMPLE_DIM)

    def _first_or_last(
        self, op: Literal["first", "last"], skipna: bool | None, keep_attrs: bool | None
    ) -> T_Xarray:
        from xarray.core.dataset import Dataset

        result = super()._first_or_last(op=op, skipna=skipna, keep_attrs=keep_attrs)
        if isinstance(result, Dataset):
            # Can't do this in the base class because group_dim is RESAMPLE_DIM
            # which is not present in the original object
            for var in result.data_vars:
                result._variables[var] = result._variables[var].transpose(
                    *self._obj._variables[var].dims
                )
        return result

    def _drop_coords(self) -> T_Xarray:
        """Drop non-dimension coordinates along the resampled dimension."""
        obj = self._obj
        for k, v in obj.coords.items():
            if k != self._dim and self._dim in v.dims:
                obj = obj.drop_vars([k])
        return obj

    def pad(self, tolerance: float | Iterable[float] | str | None = None) -> T_Xarray:
        """Forward fill new values at up-sampled frequency.

        Parameters
        ----------
        tolerance : float | Iterable[float] | str | None, default: None
            Maximum distance between original and new labels to limit
            the up-sampling method.
            Up-sampled data with indices that satisfy the equation
            ``abs(index[indexer] - target) <= tolerance`` are filled by
            new values. Data with indices that are outside the given
            tolerance are filled with ``NaN`` s.

        Returns
        -------
        padded : DataArray or Dataset
        """
        obj = self._drop_coords()
        (grouper,) = self.groupers
        return obj.reindex(
            {self._dim: grouper.full_index}, method="pad", tolerance=tolerance
        )

    ffill = pad

    def backfill(
        self, tolerance: float | Iterable[float] | str | None = None
    ) -> T_Xarray:
        """Backward fill new values at up-sampled frequency.

        Parameters
        ----------
        tolerance : float | Iterable[float] | str | None, default: None
            Maximum distance between original and new labels to limit
            the up-sampling method.
            Up-sampled data with indices that satisfy the equation
            ``abs(index[indexer] - target) <= tolerance`` are filled by
            new values. Data with indices that are outside the given
            tolerance are filled with ``NaN`` s.

        Returns
        -------
        backfilled : DataArray or Dataset
        """
        obj = self._drop_coords()
        (grouper,) = self.groupers
        return obj.reindex(
            {self._dim: grouper.full_index}, method="backfill", tolerance=tolerance
        )

    bfill = backfill

    def nearest(
        self, tolerance: float | Iterable[float] | str | None = None
    ) -> T_Xarray:
        """Take new values from nearest original coordinate to up-sampled
        frequency coordinates.

        Parameters
        ----------
        tolerance : float | Iterable[float] | str | None, default: None
            Maximum distance between original and new labels to limit
            the up-sampling method.
            Up-sampled data with indices that satisfy the equation
            ``abs(index[indexer] - target) <= tolerance`` are filled by
            new values. Data with indices that are outside the given
            tolerance are filled with ``NaN`` s.

        Returns
        -------
        upsampled : DataArray or Dataset
        """
        obj = self._drop_coords()
        (grouper,) = self.groupers
        return obj.reindex(
            {self._dim: grouper.full_index}, method="nearest", tolerance=tolerance
        )

    def interpolate(self, kind: InterpOptions = "linear", **kwargs) -> T_Xarray:
        """Interpolate up-sampled data using the original data as knots.

        Parameters
        ----------
        kind : {"linear", "nearest", "zero", "slinear", \
                "quadratic", "cubic", "polynomial"}, default: "linear"
            The method used to interpolate. The method should be supported by
            the scipy interpolator:

            - ``interp1d``: {"linear", "nearest", "zero", "slinear",
              "quadratic", "cubic", "polynomial"}
            - ``interpn``: {"linear", "nearest"}

            If ``"polynomial"`` is passed, the ``order`` keyword argument must
            also be provided.

        Returns
        -------
        interpolated : DataArray or Dataset

        See Also
        --------
        DataArray.interp
        Dataset.interp
        scipy.interpolate.interp1d

        """
        return self._interpolate(kind=kind, **kwargs)

    def _interpolate(self, kind="linear", **kwargs) -> T_Xarray:
        """Apply scipy.interpolate.interp1d along resampling dimension."""
        obj = self._drop_coords()
        (grouper,) = self.groupers
        kwargs.setdefault("bounds_error", False)
        return obj.interp(
            coords={self._dim: grouper.full_index},
            assume_sorted=True,
            method=kind,
            kwargs=kwargs,
        )


class DataArrayResample(
    Resample["DataArray"], DataArrayGroupByBase, DataArrayResampleAggregations
):
    """DataArrayGroupBy object specialized to time resampling operations over a
    specified dimension
    """

    def reduce(
        self,
        func: Callable[..., Any],
        dim: Dims = None,
        *,
        axis: int | Sequence[int] | None = None,
        keep_attrs: bool | None = None,
        keepdims: bool = False,
        shortcut: bool = True,
        **kwargs: Any,
    ) -> DataArray:
        """Reduce the items in this group by applying `func` along the
        pre-defined resampling dimension.

        Parameters
        ----------
        func : callable
            Function which can be called in the form
            `func(x, axis=axis, **kwargs)` to return the result of collapsing
            an np.ndarray over an integer valued axis.
        dim : "...", str, Iterable of Hashable or None, optional
            Dimension(s) over which to apply `func`.
        keep_attrs : bool, optional
            If True, the datasets's attributes (`attrs`) will be copied from
            the original object to the new one.  If False (default), the new
            object will be returned without attributes.
        **kwargs : dict
            Additional keyword arguments passed on to `func`.

        Returns
        -------
        reduced : DataArray
            Array with summarized data and the indicated dimension(s)
            removed.
        """
        return super().reduce(
            func=func,
            dim=dim,
            axis=axis,
            keep_attrs=keep_attrs,
            keepdims=keepdims,
            shortcut=shortcut,
            **kwargs,
        )

    def map(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...] = (),
        shortcut: bool | None = False,
        **kwargs: Any,
    ) -> DataArray:
        """Apply a function to each array in the group and concatenate them
        together into a new array.

        `func` is called like `func(ar, *args, **kwargs)` for each array `ar`
        in this group.

        Apply uses heuristics (like `pandas.GroupBy.apply`) to figure out how
        to stack together the array. The rule is:

        1. If the dimension along which the group coordinate is defined is
           still in the first grouped array after applying `func`, then stack
           over this dimension.
        2. Otherwise, stack over the new dimension given by name of this
           grouping (the argument to the `groupby` function).

        Parameters
        ----------
        func : callable
            Callable to apply to each array.
        shortcut : bool, optional
            Whether or not to shortcut evaluation under the assumptions that:

            (1) The action of `func` does not depend on any of the array
                metadata (attributes or coordinates) but only on the data and
                dimensions.
            (2) The action of `func` creates arrays with homogeneous metadata,
                that is, with the same dimensions and attributes.

            If these conditions are satisfied `shortcut` provides significant
            speedup. This should be the case for many common groupby operations
            (e.g., applying numpy ufuncs).
        args : tuple, optional
            Positional arguments passed on to `func`.
        **kwargs
            Used to call `func(ar, **kwargs)` for each array `ar`.

        Returns
        -------
        applied : DataArray
            The result of splitting, applying and combining this array.
        """
        # TODO: the argument order for Resample doesn't match that for its parent,
        # GroupBy
        combined = super().map(func, shortcut=shortcut, args=args, **kwargs)

        # If the aggregation function didn't drop the original resampling
        # dimension, then we need to do so before we can rename the proxy
        # dimension we used.
        if self._dim in combined.coords:
            combined = combined.drop_vars([self._dim])

        if RESAMPLE_DIM in combined.dims:
            combined = combined.rename({RESAMPLE_DIM: self._dim})

        return combined

    def apply(self, func, args=(), shortcut=None, **kwargs):
        """
        Backward compatible implementation of ``map``

        See Also
        --------
        DataArrayResample.map
        """
        warnings.warn(
            "Resample.apply may be deprecated in the future. Using Resample.map is encouraged",
            PendingDeprecationWarning,
            stacklevel=2,
        )
        return self.map(func=func, shortcut=shortcut, args=args, **kwargs)

    def asfreq(self) -> DataArray:
        """Return values of original object at the new up-sampling frequency;
        essentially a re-index with new times set to NaN.

        Returns
        -------
        resampled : DataArray
        """
        self._obj = self._drop_coords()
        return self.mean(None if self._dim is None else [self._dim])


class DatasetResample(
    Resample["Dataset"], DatasetGroupByBase, DatasetResampleAggregations
):
    """DatasetGroupBy object specialized to resampling a specified dimension"""

    def map(
        self,
        func: Callable[..., Any],
        args: tuple[Any, ...] = (),
        shortcut: bool | None = None,
        **kwargs: Any,
    ) -> Dataset:
        """Apply a function over each Dataset in the groups generated for
        resampling and concatenate them together into a new Dataset.

        `func` is called like `func(ds, *args, **kwargs)` for each dataset `ds`
        in this group.

        Apply uses heuristics (like `pandas.GroupBy.apply`) to figure out how
        to stack together the datasets. The rule is:

        1. If the dimension along which the group coordinate is defined is
           still in the first grouped item after applying `func`, then stack
           over this dimension.
        2. Otherwise, stack over the new dimension given by name of this
           grouping (the argument to the `groupby` function).

        Parameters
        ----------
        func : callable
            Callable to apply to each sub-dataset.
        args : tuple, optional
            Positional arguments passed on to `func`.
        **kwargs
            Used to call `func(ds, **kwargs)` for each sub-dataset `ar`.

        Returns
        -------
        applied : Dataset
            The result of splitting, applying and combining this dataset.
        """
        # ignore shortcut if set (for now)
        applied = (func(ds, *args, **kwargs) for ds in self._iter_grouped())
        combined = self._combine(applied)

        # If the aggregation function didn't drop the original resampling
        # dimension, then we need to do so before we can rename the proxy
        # dimension we used.
        if self._dim in combined.coords:
            combined = combined.drop_vars(self._dim)

        if RESAMPLE_DIM in combined.dims:
            combined = combined.rename({RESAMPLE_DIM: self._dim})

        return combined

    def apply(self, func, args=(), shortcut=None, **kwargs):
        """
        Backward compatible implementation of ``map``

        See Also
        --------
        DataSetResample.map
        """

        warnings.warn(
            "Resample.apply may be deprecated in the future. Using Resample.map is encouraged",
            PendingDeprecationWarning,
            stacklevel=2,
        )
        return self.map(func=func, shortcut=shortcut, args=args, **kwargs)

    def reduce(
        self,
        func: Callable[..., Any],
        dim: Dims = None,
        *,
        axis: int | Sequence[int] | None = None,
        keep_attrs: bool | None = None,
        keepdims: bool = False,
        shortcut: bool = True,
        **kwargs: Any,
    ) -> Dataset:
        """Reduce the items in this group by applying `func` along the
        pre-defined resampling dimension.

        Parameters
        ----------
        func : callable
            Function which can be called in the form
            `func(x, axis=axis, **kwargs)` to return the result of collapsing
            an np.ndarray over an integer valued axis.
        dim : "...", str, Iterable of Hashable or None, optional
            Dimension(s) over which to apply `func`.
        keep_attrs : bool, optional
            If True, the datasets's attributes (`attrs`) will be copied from
            the original object to the new one.  If False (default), the new
            object will be returned without attributes.
        **kwargs : dict
            Additional keyword arguments passed on to `func`.

        Returns
        -------
        reduced : Dataset
            Array with summarized data and the indicated dimension(s)
            removed.
        """
        return super().reduce(
            func=func,
            dim=dim,
            axis=axis,
            keep_attrs=keep_attrs,
            keepdims=keepdims,
            shortcut=shortcut,
            **kwargs,
        )

    def asfreq(self) -> Dataset:
        """Return values of original object at the new up-sampling frequency;
        essentially a re-index with new times set to NaN.

        Returns
        -------
        resampled : Dataset
        """
        self._obj = self._drop_coords()
        return self.mean(None if self._dim is None else [self._dim])