File: _peak_finding_utils.pyx

package info (click to toggle)
python-scipy 1.1.0-7
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 93,828 kB
  • sloc: python: 156,854; ansic: 82,925; fortran: 80,777; cpp: 7,505; makefile: 427; sh: 294
file content (351 lines) | stat: -rw-r--r-- 11,873 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
"""
Utility functions for finding peaks in signals.
"""

import numpy as np
import cython

cimport numpy as np
from libc.math cimport ceil


__all__ = ['_argmaxima1d', '_select_by_peak_distance', '_peak_prominences',
           '_peak_widths']


@cython.wraparound(False)
@cython.boundscheck(False)
def _argmaxima1d(np.float64_t[:] x not None):
    """
    Find indices of local maxima in a 1D array.

    This function finds all local maxima in a 1D array and returns their
    indices. For maxima who are wider than one sample the index of the center
    sample is returned (rounded down in case the number of samples is even).

    Parameters
    ----------
    x : ndarray
        The array to search for local maxima.

    Returns
    -------
    maxima : ndarray
        Indices of local maxima in `x`.

    See Also
    --------
    argrelmax

    Notes
    -----
    - Compared to `argrelmax` this function is significantly faster and can
      detect maxima that are more than one sample wide. However this comes at
      the cost of being only applicable to 1D arrays.
    - A maxima is defined as one or more samples of equal value that are
      surrounded on both sides by atleast one smaller sample.

    .. versionadded:: 1.1.0
    """
    # Preallocate, there can't be more maxima than half the size of `x`
    cdef np.ndarray[np.intp_t, ndim=1] maxima
    maxima = np.empty(x.shape[0] // 2, dtype=np.intp)
    cdef Py_ssize_t m = 0  # Pointer to the end of valid area in `maxima`

    # Variables to loop over `x`
    cdef Py_ssize_t i = 1  # Pointer to current sample, first one can't be maxima
    cdef Py_ssize_t i_max = x.shape[0] - 1  # Last sample can't be maxima
    cdef Py_ssize_t i_ahead  # Pointer to look ahead of current sample

    while i < i_max:
        # Test if previous sample is smaller
        if x[i - 1] < x[i]:
            i_ahead = i + 1

            # Find next sample that is unequal to x[i]
            while i_ahead < i_max and x[i_ahead] == x[i]:
                i_ahead += 1

            # Maxima is found if next unequal sample is smaller than x[i]
            if x[i_ahead] < x[i]:
                # Store sample in the center of flat area (round down)
                maxima[m] = (i + i_ahead - 1) // 2
                m += 1
                # Skip samples that can't be maxima
                i = i_ahead
        i += 1

    maxima.resize(m, refcheck=False)  # Keep only valid part of array memory.
    return maxima


@cython.wraparound(False)
@cython.boundscheck(False)
def _select_by_peak_distance(np.intp_t[:] peaks not None,
                             np.float64_t[:] priority not None,
                             np.float64_t distance):
    """
    Evaluate which peaks fulfill the distance condition.

    Parameters
    ----------
    peaks : ndarray
        Indices of peaks in `vector`.
    priority : ndarray
        An array matching `peaks` used to determine priority of each peak. A
        peak with a higher priority value is kept over one with a lower one.
    distance : np.float64
        Minimal distance that peaks must be spaced.

    Returns
    -------
    keep : ndarray[bool]
        A boolean mask evaluating to true where `peaks` fulfill the distance
        condition.

    Notes
    -----
    Declaring the input arrays as C-contiguous doesn't seem to have performance
    advantages.

    .. versionadded:: 1.1.0
    """
    cdef:
        np.intp_t[::1] priority_to_position
        np.int8_t[::1] keep
        np.intp_t i, j, k, peaks_size, distance_

    peaks_size = peaks.shape[0]
    # Round up because actual peak distance can only be natural number
    distance_ = <np.intp_t>ceil(distance)
    keep = np.ones(peaks_size, dtype=np.int8)  # Prepare array of flags

    # Create map from `i` (index for `peaks` sorted by `priority`) to `j` (index
    # for `peaks` sorted by position). This allows to iterate `peaks` and `keep`
    # with `j` by order of `priority` while still maintaining the ability to
    # step to neighbouring peaks with (`j` + 1) or (`j` - 1).
    priority_to_position = np.argsort(priority)

    with nogil:
        # Highest priority first -> iterate in reverse order (decreasing)
        for i in range(peaks_size - 1, -1, -1):
            # "Translate" `i` to `j` which points to current peak whose
            # neighbours are to be evaluated
            j = priority_to_position[i]
            if keep[j] == 0:
                # Skip evaluation for peak already marked as "don't keep"
                continue

            k = j - 1
            # Flag "earlier" peaks for removal until minimal distance is exceeded
            while 0 <= k and peaks[j] - peaks[k] < distance_:
                keep[k] = 0
                k -= 1

            k = j + 1
            # Flag "later" peaks for removal until minimal distance is exceeded
            while k < peaks_size and peaks[k] - peaks[j] < distance_:
                keep[k] = 0
                k += 1

    return keep.base.view(dtype=np.bool)  # Return as boolean array


@cython.wraparound(False)
@cython.boundscheck(False)
def _peak_prominences(np.float64_t[::1] x not None,
                      np.intp_t[::1] peaks not None,
                      np.intp_t wlen):
    """
    Calculate the prominence of each peak in a signal.

    Parameters
    ----------
    x : ndarray
        A signal with peaks.
    peaks : ndarray
        Indices of peaks in `x`.
    wlen : np.intp
        A window length in samples (see `peak_prominences`) which is rounded up
        to the nearest odd integer. If smaller than 2 the entire signal `x` is
        used.

    Returns
    -------
    prominences : ndarray
        The calculated prominences for each peak in `peaks`.
    left_bases, right_bases : ndarray
        The peaks' bases as indices in `x` to the left and right of each peak.

    Raises
    ------
    ValueError
        If an index in `peaks` doesn't point to a local maximum in `x`.

    Notes
    -----
    This is the inner function to `peak_prominences`.

    .. versionadded:: 1.1.0
    """
    cdef:
        np.float64_t[::1] prominences
        np.intp_t[::1] left_bases, right_bases
        np.float64_t left_min, right_min
        np.intp_t peak_nr, peak, i_min, i_max, i
        bint raise_error

    raise_error = False
    prominences = np.empty(peaks.shape[0], dtype=np.float64)
    left_bases = np.empty(peaks.shape[0], dtype=np.intp)
    right_bases = np.empty(peaks.shape[0], dtype=np.intp)

    with nogil:
        for peak_nr in range(peaks.shape[0]):
            peak = peaks[peak_nr]
            i_min = 0
            i_max = x.shape[0] - 1

            if 2 <= wlen:
                # Adjust window around the evaluated peak (within bounds);
                # if wlen is even the resulting window length is is implicitly
                # rounded to next odd integer
                i_min = max(peak - wlen // 2, i_min)
                i_max = min(peak + wlen // 2, i_max)

            # Find the left base in interval [i_min, peak]
            i = peak
            left_min = x[peak]
            while i_min <= i and x[i] <= x[peak]:
                if x[i] < left_min:
                    left_min = x[i]
                    left_bases[peak_nr] = i
                i -= 1
            if not left_min < x[peak]:
                raise_error = True  # Raise error outside nogil statement
                break

            # Find the right base in interval [peak, i_max]
            i = peak
            right_min = x[peak]
            while i <= i_max and x[i] <= x[peak]:
                if x[i] < right_min:
                    right_min = x[i]
                    right_bases[peak_nr] = i
                i += 1
            if not right_min < x[peak]:
                raise_error = True  # Raise error outside nogil statement
                break

            prominences[peak_nr] = x[peak] - max(left_min, right_min)

    if raise_error:
        raise ValueError('{} is not a valid peak'.format(peak))

    # Return memoryviews as ndarrays
    return prominences.base, left_bases.base, right_bases.base


@cython.wraparound(False)
@cython.boundscheck(False)
def _peak_widths(np.float64_t[::1] x not None,
                 np.intp_t[::1] peaks not None,
                 np.float64_t rel_height,
                 np.float64_t[::1] prominences not None,
                 np.intp_t[::1] left_bases not None,
                 np.intp_t[::1] right_bases not None):
    """
    Calculate the width of each each peak in a signal.

    Parameters
    ----------
    x : ndarray
        A signal with peaks.
    peaks : ndarray
        Indices of peaks in `x`.
    rel_height : np.float64
        Chooses the relative height at which the peak width is measured as a
        percentage of its prominence (see `peak_widths`).
    prominences : ndarray
        Prominences of each peak in `peaks` as returned by `peak_prominences`.
    left_bases, right_bases : ndarray
        Left and right bases of each peak in `peaks` as returned by
        `peak_prominences`.

    Returns
    -------
    widths : ndarray
        The widths for each peak in samples.
    width_heights : ndarray
        The height of the contour lines at which the `widths` where evaluated.
    left_ips, right_ips : ndarray
        Interpolated positions of left and right intersection points of a
        horizontal line at the respective evaluation height.

    Raises
    ------
    ValueError
        If the supplied prominence data doesn't satisfy the condition
        ``0 <= left_base <= peak <= right_base < x.shape[0]`` for each peak or
        if `peaks`, `left_bases` and `right_bases` don't share the same shape.

    Notes
    -----
    This is the inner function to `peak_widths`.

    .. versionadded:: 1.1.0
    """
    cdef:
        np.float64_t[::1] widths, width_heights, left_ips, right_ips
        np.float64_t height, left_ip, right_ip
        np.intp_t p, peak, i, i_max, i_min
        bint raise_error

    if not (peaks.shape[0] == prominences.shape[0] == left_bases.shape[0] ==
            right_bases.shape[0]):
        raise ValueError("arrays in `prominence_data` must have the same shape "
                         "as `peaks`")

    raise_error = False
    widths = np.empty(peaks.shape[0], dtype=np.float64)
    width_heights = np.empty(peaks.shape[0], dtype=np.float64)
    left_ips = np.empty(peaks.shape[0], dtype=np.float64)
    right_ips = np.empty(peaks.shape[0], dtype=np.float64)

    with nogil:
        for p in range(peaks.shape[0]):
            i_min = left_bases[p]
            i_max = right_bases[p]
            peak = peaks[p]
            # Validate bounds and order
            if not 0 <= i_min < peak < i_max < x.shape[0]:
                raise_error = True
                break
            height = width_heights[p] = x[peak] - prominences[p] * rel_height

            # Find intersection point on left side
            i = peak
            while i_min < i and height < x[i]:
                i -= 1
            left_ip = <np.float64_t>i
            if x[i] < height:
                # Interpolate if true intersection height is between samples
                left_ip += (height - x[i]) / (x[i + 1] - x[i])

            # Find intersection point on right side
            i = peak
            while i < i_max and height < x[i]:
                i += 1
            right_ip = <np.float64_t>i
            if  x[i] < height:
                # Interpolate if true intersection height is between samples
                right_ip -= (height - x[i]) / (x[i - 1] - x[i])

            widths[p] = right_ip - left_ip
            left_ips[p] = left_ip
            right_ips[p] = right_ip

    if raise_error:
        raise ValueError("prominence data is invalid for peak {}".format(peak))

    return widths.base, width_heights.base, left_ips.base, right_ips.base