1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203
|
from math import floor, ceil
from ._extensions._pywt import (DiscreteContinuousWavelet, ContinuousWavelet,
Wavelet, _check_dtype)
from ._functions import integrate_wavelet, scale2frequency
__all__ = ["cwt"]
import numpy as np
try:
# Prefer scipy.fft (new in SciPy 1.4)
import scipy.fft
fftmodule = scipy.fft
next_fast_len = fftmodule.next_fast_len
except ImportError:
try:
import scipy.fftpack
fftmodule = scipy.fftpack
next_fast_len = fftmodule.next_fast_len
except ImportError:
fftmodule = np.fft
# provide a fallback so scipy is an optional requirement
def next_fast_len(n):
"""Round up size to the nearest power of two.
Given a number of samples `n`, returns the next power of two
following this number to take advantage of FFT speedup.
This fallback is less efficient than `scipy.fftpack.next_fast_len`
"""
return 2**ceil(np.log2(n))
def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
"""
cwt(data, scales, wavelet)
One dimensional Continuous Wavelet Transform.
Parameters
----------
data : array_like
Input signal
scales : array_like
The wavelet scales to use. One can use
``f = scale2frequency(wavelet, scale)/sampling_period`` to determine
what physical frequency, ``f``. Here, ``f`` is in hertz when the
``sampling_period`` is given in seconds.
wavelet : Wavelet object or name
Wavelet to use
sampling_period : float
Sampling period for the frequencies output (optional).
The values computed for ``coefs`` are independent of the choice of
``sampling_period`` (i.e. ``scales`` is not scaled by the sampling
period).
method : {'conv', 'fft'}, optional
The method used to compute the CWT. Can be any of:
- ``conv`` uses ``numpy.convolve``.
- ``fft`` uses frequency domain convolution.
- ``auto`` uses automatic selection based on an estimate of the
computational complexity at each scale.
The ``conv`` method complexity is ``O(len(scale) * len(data))``.
The ``fft`` method is ``O(N * log2(N))`` with
``N = len(scale) + len(data) - 1``. It is well suited for large size
signals but slightly slower than ``conv`` on small ones.
axis: int, optional
Axis over which to compute the CWT. If not given, the last axis is
used.
Returns
-------
coefs : array_like
Continuous wavelet transform of the input signal for the given scales
and wavelet. The first axis of ``coefs`` corresponds to the scales.
The remaining axes match the shape of ``data``.
frequencies : array_like
If the unit of sampling period are seconds and given, than frequencies
are in hertz. Otherwise, a sampling period of 1 is assumed.
Notes
-----
Size of coefficients arrays depends on the length of the input array and
the length of given scales.
Examples
--------
>>> import pywt
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> x = np.arange(512)
>>> y = np.sin(2*np.pi*x/32)
>>> coef, freqs=pywt.cwt(y,np.arange(1,129),'gaus1')
>>> plt.matshow(coef) # doctest: +SKIP
>>> plt.show() # doctest: +SKIP
----------
>>> import pywt
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> t = np.linspace(-1, 1, 200, endpoint=False)
>>> sig = np.cos(2 * np.pi * 7 * t) + np.real(np.exp(-7*(t-0.4)**2)*np.exp(1j*2*np.pi*2*(t-0.4)))
>>> widths = np.arange(1, 31)
>>> cwtmatr, freqs = pywt.cwt(sig, widths, 'mexh')
>>> plt.imshow(cwtmatr, extent=[-1, 1, 1, 31], cmap='PRGn', aspect='auto',
... vmax=abs(cwtmatr).max(), vmin=-abs(cwtmatr).max()) # doctest: +SKIP
>>> plt.show() # doctest: +SKIP
"""
# accept array_like input; make a copy to ensure a contiguous array
dt = _check_dtype(data)
data = np.asarray(data, dtype=dt)
dt_cplx = np.result_type(dt, np.complex64)
if not isinstance(wavelet, (ContinuousWavelet, Wavelet)):
wavelet = DiscreteContinuousWavelet(wavelet)
if np.isscalar(scales):
scales = np.array([scales])
if not np.isscalar(axis):
raise np.AxisError("axis must be a scalar.")
dt_out = dt_cplx if wavelet.complex_cwt else dt
out = np.empty((np.size(scales),) + data.shape, dtype=dt_out)
precision = 10
int_psi, x = integrate_wavelet(wavelet, precision=precision)
int_psi = np.conj(int_psi) if wavelet.complex_cwt else int_psi
# convert int_psi, x to the same precision as the data
dt_psi = dt_cplx if int_psi.dtype.kind == 'c' else dt
int_psi = np.asarray(int_psi, dtype=dt_psi)
x = np.asarray(x, dtype=data.real.dtype)
if method == 'fft':
size_scale0 = -1
fft_data = None
elif not method == 'conv':
raise ValueError("method must be 'conv' or 'fft'")
if data.ndim > 1:
# move axis to be transformed last (so it is contiguous)
data = data.swapaxes(-1, axis)
# reshape to (n_batch, data.shape[-1])
data_shape_pre = data.shape
data = data.reshape((-1, data.shape[-1]))
for i, scale in enumerate(scales):
step = x[1] - x[0]
j = np.arange(scale * (x[-1] - x[0]) + 1) / (scale * step)
j = j.astype(int) # floor
if j[-1] >= int_psi.size:
j = np.extract(j < int_psi.size, j)
int_psi_scale = int_psi[j][::-1]
if method == 'conv':
if data.ndim == 1:
conv = np.convolve(data, int_psi_scale)
else:
# batch convolution via loop
conv_shape = list(data.shape)
conv_shape[-1] += int_psi_scale.size - 1
conv_shape = tuple(conv_shape)
conv = np.empty(conv_shape, dtype=dt_out)
for n in range(data.shape[0]):
conv[n, :] = np.convolve(data[n], int_psi_scale)
else:
# The padding is selected for:
# - optimal FFT complexity
# - to be larger than the two signals length to avoid circular
# convolution
size_scale = next_fast_len(
data.shape[-1] + int_psi_scale.size - 1
)
if size_scale != size_scale0:
# Must recompute fft_data when the padding size changes.
fft_data = fftmodule.fft(data, size_scale, axis=-1)
size_scale0 = size_scale
fft_wav = fftmodule.fft(int_psi_scale, size_scale, axis=-1)
conv = fftmodule.ifft(fft_wav * fft_data, axis=-1)
conv = conv[..., :data.shape[-1] + int_psi_scale.size - 1]
coef = - np.sqrt(scale) * np.diff(conv, axis=-1)
if out.dtype.kind != 'c':
coef = coef.real
# transform axis is always -1 due to the data reshape above
d = (coef.shape[-1] - data.shape[-1]) / 2.
if d > 0:
coef = coef[..., floor(d):-ceil(d)]
elif d < 0:
raise ValueError(
"Selected scale of {} too small.".format(scale))
if data.ndim > 1:
# restore original data shape and axis position
coef = coef.reshape(data_shape_pre)
coef = coef.swapaxes(axis, -1)
out[i, ...] = coef
frequencies = scale2frequency(wavelet, scales, precision)
if np.isscalar(frequencies):
frequencies = np.array([frequencies])
frequencies /= sampling_period
return out, frequencies
|