File: __init__.py

package info (click to toggle)
numpy-rms 0.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 132 kB
  • sloc: python: 138; ansic: 95; makefile: 4
file content (59 lines) | stat: -rw-r--r-- 1,810 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from typing import Optional

import _numpy_rms
import numpy as np
from numpy.typing import NDArray

try:
    from ._version import version as __version__
except ImportError:
    __version__ = "unknown"


def rms(a: NDArray, window_size: Optional[int] = None) -> NDArray:
    """
    Calculate RMS series for the given NumPy array.

    :param a: NumPy array to process. Can be 1D or 2D.
    :param window_size: Window size for the RMS calculation. If not specified, it defaults to the length of the array.
    :return: A NumPy array containing the RMS series.
    """
    if 0 in a.shape:
        raise ValueError("Cannot input empty array")

    if window_size is None:
        window_size = a.shape[-1]

    if (
        a.dtype == np.dtype("float32")
        and a.ndim in (1, 2)
        and a.flags["C_CONTIGUOUS"]
    ):
        output_length = a.shape[-1] // window_size
        if a.ndim == 1:
            output_shape = (output_length,)
        else:  # a.ndim == 2
            output_shape = (a.shape[0], output_length)
        output_array = np.zeros(shape=output_shape, dtype=a.dtype)

        if a.ndim == 1:
            _numpy_rms.lib.rms(
                _numpy_rms.ffi.cast("float *", a.ctypes.data),
                window_size,
                _numpy_rms.ffi.cast("float *", output_array.ctypes.data),
                output_length,
            )
        else:  # a.ndim == 2
            for i in range(a.shape[0]):
                _numpy_rms.lib.rms(
                    _numpy_rms.ffi.cast("float *", a[i].ctypes.data),
                    window_size,
                    _numpy_rms.ffi.cast("float *", output_array[i].ctypes.data),
                    output_length,
                )

        return output_array

    from .fallback import rms_numpy

    return rms_numpy(a, window_size)