1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
from typing import Optional
import _numpy_rms
import numpy as np
from numpy.typing import NDArray
try:
from ._version import version as __version__
except ImportError:
__version__ = "unknown"
def rms(a: NDArray, window_size: Optional[int] = None) -> NDArray:
"""
Calculate RMS series for the given NumPy array.
:param a: NumPy array to process. Can be 1D or 2D.
:param window_size: Window size for the RMS calculation. If not specified, it defaults to the length of the array.
:return: A NumPy array containing the RMS series.
"""
if 0 in a.shape:
raise ValueError("Cannot input empty array")
if window_size is None:
window_size = a.shape[-1]
if (
a.dtype == np.dtype("float32")
and a.ndim in (1, 2)
and a.flags["C_CONTIGUOUS"]
):
output_length = a.shape[-1] // window_size
if a.ndim == 1:
output_shape = (output_length,)
else: # a.ndim == 2
output_shape = (a.shape[0], output_length)
output_array = np.zeros(shape=output_shape, dtype=a.dtype)
if a.ndim == 1:
_numpy_rms.lib.rms(
_numpy_rms.ffi.cast("float *", a.ctypes.data),
window_size,
_numpy_rms.ffi.cast("float *", output_array.ctypes.data),
output_length,
)
else: # a.ndim == 2
for i in range(a.shape[0]):
_numpy_rms.lib.rms(
_numpy_rms.ffi.cast("float *", a[i].ctypes.data),
window_size,
_numpy_rms.ffi.cast("float *", output_array[i].ctypes.data),
output_length,
)
return output_array
from .fallback import rms_numpy
return rms_numpy(a, window_size)
|