1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
from __future__ import annotations
from typing import Any, Generic, Mapping
import numpy as np
from packaging.version import Version
from xarray.core.options import _get_keep_attrs
from xarray.core.pdcompat import count_not_none
from xarray.core.pycompat import is_duck_dask_array
from xarray.core.types import T_DataWithCoords
def _get_alpha(com=None, span=None, halflife=None, alpha=None):
# pandas defines in terms of com (converting to alpha in the algo)
# so use its function to get a com and then convert to alpha
com = _get_center_of_mass(com, span, halflife, alpha)
return 1 / (1 + com)
def move_exp_nanmean(array, *, axis, alpha):
if is_duck_dask_array(array):
raise TypeError("rolling_exp is not currently support for dask-like arrays")
import numbagg
# No longer needed in numbag > 0.2.0; remove in time
if axis == ():
return array.astype(np.float64)
else:
return numbagg.move_exp_nanmean(array, axis=axis, alpha=alpha)
def move_exp_nansum(array, *, axis, alpha):
if is_duck_dask_array(array):
raise TypeError("rolling_exp is not currently supported for dask-like arrays")
import numbagg
# numbagg <= 0.2.0 did not have a __version__ attribute
if Version(getattr(numbagg, "__version__", "0.1.0")) < Version("0.2.0"):
raise ValueError("`rolling_exp(...).sum() requires numbagg>=0.2.1.")
return numbagg.move_exp_nansum(array, axis=axis, alpha=alpha)
def _get_center_of_mass(comass, span, halflife, alpha):
"""
Vendored from pandas.core.window.common._get_center_of_mass
See licenses/PANDAS_LICENSE for the function's license
"""
valid_count = count_not_none(comass, span, halflife, alpha)
if valid_count > 1:
raise ValueError("comass, span, halflife, and alpha are mutually exclusive")
# Convert to center of mass; domain checks ensure 0 < alpha <= 1
if comass is not None:
if comass < 0:
raise ValueError("comass must satisfy: comass >= 0")
elif span is not None:
if span < 1:
raise ValueError("span must satisfy: span >= 1")
comass = (span - 1) / 2.0
elif halflife is not None:
if halflife <= 0:
raise ValueError("halflife must satisfy: halflife > 0")
decay = 1 - np.exp(np.log(0.5) / halflife)
comass = 1 / decay - 1
elif alpha is not None:
if alpha <= 0 or alpha > 1:
raise ValueError("alpha must satisfy: 0 < alpha <= 1")
comass = (1.0 - alpha) / alpha
else:
raise ValueError("Must pass one of comass, span, halflife, or alpha")
return float(comass)
class RollingExp(Generic[T_DataWithCoords]):
"""
Exponentially-weighted moving window object.
Similar to EWM in pandas
Parameters
----------
obj : Dataset or DataArray
Object to window.
windows : mapping of hashable to int (or float for alpha type)
A mapping from the name of the dimension to create the rolling
exponential window along (e.g. `time`) to the size of the moving window.
window_type : {"span", "com", "halflife", "alpha"}, default: "span"
The format of the previously supplied window. Each is a simple
numerical transformation of the others. Described in detail:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.ewm.html
Returns
-------
RollingExp : type of input argument
"""
def __init__(
self,
obj: T_DataWithCoords,
windows: Mapping[Any, int | float],
window_type: str = "span",
):
self.obj: T_DataWithCoords = obj
dim, window = next(iter(windows.items()))
self.dim = dim
self.alpha = _get_alpha(**{window_type: window})
def mean(self, keep_attrs: bool | None = None) -> T_DataWithCoords:
"""
Exponentially weighted moving average.
Parameters
----------
keep_attrs : bool, default: None
If True, the attributes (``attrs``) will be copied from the original
object to the new one. If False, the new object will be returned
without attributes. If None uses the global default.
Examples
--------
>>> da = xr.DataArray([1, 1, 2, 2, 2], dims="x")
>>> da.rolling_exp(x=2, window_type="span").mean()
<xarray.DataArray (x: 5)>
array([1. , 1. , 1.69230769, 1.9 , 1.96694215])
Dimensions without coordinates: x
"""
if keep_attrs is None:
keep_attrs = _get_keep_attrs(default=True)
return self.obj.reduce(
move_exp_nanmean, dim=self.dim, alpha=self.alpha, keep_attrs=keep_attrs
)
def sum(self, keep_attrs: bool | None = None) -> T_DataWithCoords:
"""
Exponentially weighted moving sum.
Parameters
----------
keep_attrs : bool, default: None
If True, the attributes (``attrs``) will be copied from the original
object to the new one. If False, the new object will be returned
without attributes. If None uses the global default.
Examples
--------
>>> da = xr.DataArray([1, 1, 2, 2, 2], dims="x")
>>> da.rolling_exp(x=2, window_type="span").sum()
<xarray.DataArray (x: 5)>
array([1. , 1.33333333, 2.44444444, 2.81481481, 2.9382716 ])
Dimensions without coordinates: x
"""
if keep_attrs is None:
keep_attrs = _get_keep_attrs(default=True)
return self.obj.reduce(
move_exp_nansum, dim=self.dim, alpha=self.alpha, keep_attrs=keep_attrs
)
|