1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210
|
# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.
"""
Base signal processing functions and utilities
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
"""
from __future__ import annotations
from collections.abc import Callable
from typing import Any
import numpy as np
from sigima.objects import NO_ROI, GeometryResult, KindShape, SignalObj
from sigima.proc.base import dst_1_to_1
def restore_data_outside_roi(dst: SignalObj, src: SignalObj) -> None:
"""Restore data outside the Region Of Interest (ROI) of the input signal
after a computation, only if the input signal has a ROI,
and if the output signal has the same ROI as the input signal,
and if the data types are the same,
and if the shapes are the same.
Otherwise, do nothing.
Args:
dst: destination signal object
src: source signal object
"""
if src.maskdata is not None and dst.maskdata is not None:
if (
np.array_equal(src.maskdata, dst.maskdata)
and dst.xydata.dtype == src.xydata.dtype
and dst.xydata.shape == src.xydata.shape
):
dst.xydata[src.maskdata] = src.xydata[src.maskdata]
def is_uncertainty_data_available(signals: SignalObj | list[SignalObj]) -> bool:
"""Check if all signals have uncertainty data.
This functions is used to determine whether enough information is available to
propagate uncertainty.
Args:
signals: Signal object or list of signal objects.
Returns:
True if all signals have uncertainty data, False otherwise.
"""
if isinstance(signals, SignalObj):
signals = [signals]
return all(sig.dy is not None for sig in signals)
class Wrap1to1Func:
"""Wrap a 1 array → 1 array function (the simple case of y1 = f(y0)) to produce
a 1 signal → 1 signal function, which can be used as a Sigima computation function
and inside DataLab's infrastructure to perform computations with the Signal
Processor object.
This wrapping mechanism using a class is necessary for the resulted function to be
pickable by the ``multiprocessing`` module.
The instance of this wrapper is callable and returns
a :class:`sigima.objects.SignalObj` object.
Example:
>>> import numpy as np
>>> from sigima.proc.signal import Wrap1to1Func
>>> import sigima.objects
>>> def square(y):
... return y**2
>>> compute_square = Wrap1to1Func(square)
>>> x = np.linspace(0, 10, 100)
>>> y = np.sin(x)
>>> sig0 = sigima.objects.create_signal("Example", x, y)
>>> sig1 = compute_square(sig0)
Args:
func: 1 array → 1 array function
*args: Additional positional arguments to pass to the function
**kwargs: Additional keyword arguments to pass to the function
.. note::
If `func_name` is provided in the keyword arguments, it will be used as the
function name instead of the default name derived from the function itself.
.. note::
This wrapper is suitable for functions that don't require custom uncertainty
propagation. For mathematical functions with specific uncertainty formulas
(sqrt, log10, exp, etc.), implement uncertainty propagation directly in the
computation function.
"""
def __init__(self, func: Callable, *args: Any, **kwargs: Any) -> None:
self.func = func
self.args = args
self.kwargs = kwargs
self.__name__ = self.kwargs.pop("func_name", func.__name__)
self.__doc__ = func.__doc__
self.__call__.__func__.__doc__ = self.func.__doc__
def __call__(self, src: SignalObj) -> SignalObj:
"""Compute the function on the input signal and return the result signal
Args:
src: input signal object
Returns:
Result signal object
"""
suffix = ", ".join(
[str(arg) for arg in self.args]
+ [f"{k}={v}" for k, v in self.kwargs.items() if v is not None]
)
dst = dst_1_to_1(src, self.__name__, suffix)
x, y = src.get_data()
# Apply function and propagate uncertainty unchanged
dst.set_xydata(x, self.func(y, *self.args, **self.kwargs), src.dx, src.dy)
restore_data_outside_roi(dst, src)
return dst
def compute_geometry_from_obj(
title: str,
shape: str | KindShape,
obj: SignalObj,
func: Callable,
*args: Any,
) -> GeometryResult | None:
"""Calculate result geometry by executing a computation function on a signal object.
Args:
title: Result title
shape: Result shape kind (e.g., "segment", "point", KindShape.MARKER)
obj: Input signal object
func: Computation function that takes (x, y, ``*args``) and returns coordinates
*args: Additional computation function arguments
Returns:
Result geometry object or None if no result is found
.. note::
The computation function must take x and y arrays as the first two arguments,
followed by any additional arguments, and return a NumPy array containing
coordinate pairs in the form ``[[x0, y0], [x1, y1], ...]``.
"""
rows: list[np.ndarray] = []
roi_idx: list[int] = []
for i_roi in obj.iterate_roi_indices():
x, y = obj.get_data(i_roi)
if args:
results: np.ndarray = func(x, y, *args)
else:
results: np.ndarray = func(x, y)
if results is None:
continue
results = np.array(results, dtype=float)
if results.size == 0:
continue
# Ensure results are in the correct 2D format
if results.ndim == 1:
# For segment shapes, expect 4 coordinates: [x0, y0, x1, y1]
if shape in ("segment", KindShape.SEGMENT) and len(results) == 4:
results = results.reshape(1, 4)
elif len(results) % 2 == 0:
# Reshape flat coordinate array to pairs for points/markers
results = results.reshape(-1, 2)
else:
continue # Skip malformed results
elif results.ndim != 2 or results.shape[1] < 2:
continue # Skip malformed results
rows.append(results)
roi_idx.extend([NO_ROI if i_roi is None else int(i_roi)] * results.shape[0])
if not rows:
return None
coords = np.vstack(rows)
# Convert shape to KindShape enum
if isinstance(shape, KindShape):
shape_kind = shape
elif shape == "segment":
shape_kind = KindShape.SEGMENT
elif shape == "point":
shape_kind = KindShape.POINT
elif shape == "marker":
shape_kind = KindShape.MARKER
else:
shape_kind = KindShape.POINT # Default fallback
return GeometryResult(
title=title,
kind=shape_kind,
coords=coords,
roi_indices=np.array(roi_idx, dtype=int),
)
|