File: stability_unit_test.py

package info (click to toggle)
python-sigima 1.0.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 24,956 kB
  • sloc: python: 33,326; makefile: 3
file content (431 lines) | stat: -rw-r--r-- 15,934 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.

"""
Signal stability analysis unit test.
"""

# pylint: disable=invalid-name  # Allows short reference names like x, y, ...
# pylint: disable=duplicate-code

from __future__ import annotations

from typing import Callable

import numpy as np
import pytest

import sigima.objects
import sigima.params
import sigima.proc.signal
from sigima.tests.helpers import check_array_result


def get_optimal_points(test_func: Callable) -> int:
    """Return optimal number of points for different algorithms.

    Args:
        test_func: Test function object to determine algorithm type

    Returns:
        Optimal number of points balancing accuracy and performance
    """
    func_name = test_func.__name__
    if "overlapping" in func_name or "hadamard" in func_name:
        return 500  # Minimum for meaningful results
    if "modified" in func_name or "total" in func_name:
        return 1000  # Need more for averaging
    # allan_variance, allan_deviation, time_deviation
    return 2000  # Balance between speed and accuracy


def generate_white_noise(n_points: int, sigma=1.0) -> np.ndarray:
    """Generate white noise with known characteristics.

    Args:
        n_points: Number of data points
        sigma: Standard deviation of the white noise (default is 1.0)

    Returns:
        Array of white noise values
    """
    return np.random.normal(0, sigma, n_points)


def theoretical_allan_variance_white_noise(tau: float, sigma: float) -> float:
    """Calculate theoretical Allan variance for white noise.

    For white noise: AVAR(τ) = σ²/(2τ)
    But the Allan variance is computed as AVAR(τ) = σ²τ/τ = σ²τ because of the
    overlapping nature of the samples.

    Args:
        tau: Averaging time
        sigma: Standard deviation of the white noise

    Returns:
        Allan variance value for the given tau and sigma
    """
    return sigma**2 / tau


def generate_drift_signal(
    n_points: int, slope: float, intercept: float = 0
) -> tuple[np.ndarray, np.ndarray]:
    """Generate a linear drift signal.

    Args:
        n_points: Number of data points
        slope: Slope of the linear drift
        intercept: Intercept of the linear drift (default is 0)

    Returns:
        A tuple of (time array, values array)
    """
    time = np.arange(n_points)
    values = slope * time + intercept
    return time, values


def theoretical_allan_variance_drift(tau: float, slope: float) -> float:
    """Theoretical Allan variance for a drift signal.

    Args:
        tau: Averaging time
        slope: Slope of the linear drift

    Returns:
        Allan variance value for the given tau and slope
    """
    return (slope**2 * tau**2) / 2


@pytest.mark.validation
def test_signal_allan_variance():
    """Test Allan variance computation against theoretical values."""
    n_points = get_optimal_points(test_signal_allan_variance)
    sigma = 1.0

    # Set random seed for reproducibility
    np.random.seed(42)

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters - limit max_tau for more reliable statistics
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 20  # Limited to ensure sufficient samples for averaging

    # Compute Allan variance using the high-level function
    res1 = sigima.proc.signal.allan_variance(sig1, param)
    th_av_white = theoretical_allan_variance_white_noise(res1.x, sigma)

    # Use relative tolerance for white noise (statistical variation scales)
    # 20% tolerance accounts for statistical variance in Allan estimator
    check_array_result(
        "White noise Allan variance",
        res1.y,
        th_av_white,
        rtol=0.20,
        atol=0.005,
    )

    # Generate and test drift signal (deterministic - same seed not needed)
    slope = 0.01
    time, values = generate_drift_signal(n_points, slope)
    sig2 = sigima.objects.create_signal("Drift Test", time, values)

    # Compute Allan variance using the high-level function
    res2 = sigima.proc.signal.allan_variance(sig2, param)
    th_av_drift = theoretical_allan_variance_drift(res2.x, slope)

    # Drift is deterministic, tighter tolerances apply
    check_array_result(
        "Drift Allan variance",
        res2.y,
        th_av_drift,
        rtol=0.05,
        atol=0.0001,
    )


@pytest.mark.validation
def test_signal_allan_deviation():
    """Test Allan deviation computation against theoretical values."""
    n_points = get_optimal_points(test_signal_allan_deviation)
    sigma = 1.0

    # Set random seed for reproducibility
    np.random.seed(43)

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters - limit max_tau for more reliable statistics
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 20  # Limited to ensure sufficient samples for averaging

    # Compute Allan deviation using the high-level function
    res1 = sigima.proc.signal.allan_deviation(sig1, param)
    th_av_white = theoretical_allan_variance_white_noise(res1.x, sigma)

    # Use relative tolerance for white noise (statistical variation scales)
    # 20% tolerance accounts for statistical variance in Allan estimator
    check_array_result(
        "White noise Allan deviation",
        res1.y,
        np.sqrt(th_av_white),
        rtol=0.20,
        atol=0.005,
    )

    # Generate and test drift signal (deterministic - same seed not needed)
    slope = 0.01
    time, values = generate_drift_signal(n_points, slope)
    sig2 = sigima.objects.create_signal("Drift Test", time, values)

    # Compute Allan deviation using the high-level function
    res2 = sigima.proc.signal.allan_deviation(sig2, param)
    th_av_drift = theoretical_allan_variance_drift(res2.x, slope)

    # Drift is deterministic, tighter tolerances apply
    check_array_result(
        "Drift Allan deviation",
        res2.y,
        np.sqrt(th_av_drift),
        rtol=0.05,
        atol=0.0001,
    )


@pytest.mark.validation
def test_signal_overlapping_allan_variance():
    """Test Overlapping Allan variance computation."""
    n_points = get_optimal_points(test_signal_overlapping_allan_variance)
    sigma = 1.0

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 50

    # Compute Overlapping Allan variance using the high-level function
    res1 = sigima.proc.signal.overlapping_allan_variance(sig1, param)

    # Overlapping Allan variance should produce finite, positive results
    assert len(res1.y) > 0, "Overlapping Allan variance should produce results"
    valid_values = res1.y[~np.isnan(res1.y)]
    assert len(valid_values) > 0, "Overlapping Allan variance should have valid values"
    assert np.all(valid_values > 0), "Overlapping Allan variance should be positive"

    # For white noise, overlapping Allan variance should be related to the noise level
    # and generally decrease with tau (though not necessarily following exact 1/tau)
    expected_order = sigma**2
    assert np.mean(valid_values) < 10 * expected_order, (
        "Overlapping Allan variance should be reasonable for white noise"
    )

    # Generate and test drift signal
    slope = 0.01
    time, values = generate_drift_signal(n_points, slope)
    sig2 = sigima.objects.create_signal("Drift Test", time, values)

    # Compute Overlapping Allan variance using the high-level function
    res2 = sigima.proc.signal.overlapping_allan_variance(sig2, param)

    # For drift signals, overlapping Allan variance should also be finite and positive
    valid_drift_values = res2.y[~np.isnan(res2.y)]
    assert len(valid_drift_values) > 0, (
        "Overlapping Allan variance should work for drift"
    )
    assert np.all(valid_drift_values > 0), (
        "Overlapping Allan variance should be positive"
    )

    # Compare with regular Allan variance to ensure overlapping version
    # produces reasonable results
    res_regular = sigima.proc.signal.allan_variance(sig1, param)
    valid_regular = res_regular.y[~np.isnan(res_regular.y)]
    if len(valid_regular) > 0 and len(valid_values) > 0:
        # Results should be of similar order of magnitude
        ratio = np.mean(valid_values) / np.mean(valid_regular)
        assert 0.1 < ratio < 10, (
            "Overlapping and regular Allan variance should be of similar magnitude"
        )


@pytest.mark.validation
def test_signal_modified_allan_variance():
    """Test Modified Allan variance computation."""
    n_points = get_optimal_points(test_signal_modified_allan_variance)
    sigma = 1.0

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 50

    # Compute Modified Allan variance using the high-level function
    res1 = sigima.proc.signal.modified_allan_variance(sig1, param)

    # For white noise, Modified Allan variance should be proportional to 1/tau
    # The exact relationship depends on the specific implementation
    # We check that values are reasonable and decrease with tau
    assert len(res1.y) > 0, "Modified Allan variance should produce results"
    assert np.all(res1.y[~np.isnan(res1.y)] > 0), (
        "Modified Allan variance should be positive"
    )

    # For white noise, Modified Allan variance typically decreases with tau
    valid_indices = ~np.isnan(res1.y)
    if np.sum(valid_indices) > 1:
        valid_y = res1.y[valid_indices]
        # Check general decreasing trend for first few points
        if len(valid_y) >= 3:
            assert valid_y[2] < valid_y[0], (
                "Modified Allan variance should generally decrease for white noise"
            )


@pytest.mark.validation
def test_signal_hadamard_variance():
    """Test Hadamard variance computation."""
    n_points = get_optimal_points(test_signal_hadamard_variance)
    sigma = 1.0

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 50

    # Compute Hadamard variance using the high-level function
    res1 = sigima.proc.signal.hadamard_variance(sig1, param)

    # For white noise, Hadamard variance should be finite and positive
    assert len(res1.y) > 0, "Hadamard variance should produce results"
    valid_values = res1.y[~np.isnan(res1.y)]
    assert len(valid_values) > 0, "Hadamard variance should have valid values"
    assert np.all(valid_values > 0), "Hadamard variance should be positive"

    # Generate and test linear drift signal
    # (Hadamard variance is robust to linear drift)
    slope = 0.01
    time, values = generate_drift_signal(n_points, slope)
    sig2 = sigima.objects.create_signal("Drift Test", time, values)

    # Compute Hadamard variance for drift signal
    res2 = sigima.proc.signal.hadamard_variance(sig2, param)

    # Hadamard variance should be less sensitive to linear drift than Allan variance
    # For pure linear drift, Hadamard variance should be close to zero or very small
    valid_drift_values = res2.y[~np.isnan(res2.y)]
    if len(valid_drift_values) > 0:
        # Hadamard variance should be smaller for drift signals
        assert np.mean(valid_drift_values) < np.mean(valid_values), (
            "Hadamard variance should be smaller for drift signals than white noise"
        )


@pytest.mark.validation
def test_signal_total_variance():
    """Test Total variance computation."""
    n_points = get_optimal_points(test_signal_total_variance)
    sigma = 1.0

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 50

    # Compute Total variance using the high-level function
    res1 = sigima.proc.signal.total_variance(sig1, param)

    # Total variance should be finite and positive
    assert len(res1.y) > 0, "Total variance should produce results"
    valid_values = res1.y[~np.isnan(res1.y)]
    assert len(valid_values) > 0, "Total variance should have valid values"
    assert np.all(valid_values > 0), "Total variance should be positive"

    # For white noise, total variance should be related to the noise level
    # and should be of the same order of magnitude as the square of the noise
    expected_order = sigma**2
    assert np.mean(valid_values) < 100 * expected_order, (
        "Total variance should be reasonable for white noise"
    )


@pytest.mark.validation
def test_signal_time_deviation():
    """Test Time Deviation computation."""
    n_points = get_optimal_points(test_signal_time_deviation)
    sigma = 1.0

    # Generate and test white noise signal
    time_white = np.arange(n_points)
    values_white = generate_white_noise(n_points, sigma)
    sig1 = sigima.objects.create_signal("White Noise Test", time_white, values_white)

    # Define Allan variance parameters
    param = sigima.params.AllanVarianceParam()
    param.max_tau = 50

    # Compute Time Deviation using the high-level function
    res1 = sigima.proc.signal.time_deviation(sig1, param)

    # Time deviation should be finite and positive
    assert len(res1.y) > 0, "Time deviation should produce results"
    valid_values = res1.y[~np.isnan(res1.y)]
    assert len(valid_values) > 0, "Time deviation should have valid values"
    assert np.all(valid_values > 0), "Time deviation should be positive"

    # Time deviation is related to Allan variance: TDEV = sqrt(AVAR) * tau
    # So it should increase with tau for white noise
    if len(valid_values) >= 2:
        valid_x = res1.x[~np.isnan(res1.y)]
        # Check that time deviation generally increases with tau for white noise
        correlation = np.corrcoef(valid_x[: len(valid_values)], valid_values)[0, 1]
        assert correlation > 0.5, (
            "Time deviation should generally increase with tau for white noise"
        )

    # Compare with Allan deviation to verify the relationship
    res_adev = sigima.proc.signal.allan_deviation(sig1, param)

    # TDEV = ADEV * tau (approximately)
    # Check this relationship for some values
    for i, tau in enumerate(res1.x[: min(5, len(res1.x))]):
        if not np.isnan(res1.y[i]) and not np.isnan(res_adev.y[i]) and tau > 0:
            expected_tdev = res_adev.y[i] * tau
            relative_error = abs(res1.y[i] - expected_tdev) / expected_tdev
            assert relative_error < 0.1, (
                f"Time deviation should match ADEV * tau relationship at tau={tau}"
            )


if __name__ == "__main__":
    test_signal_allan_variance()
    test_signal_allan_deviation()
    test_signal_overlapping_allan_variance()
    test_signal_modified_allan_variance()
    test_signal_hadamard_variance()
    test_signal_total_variance()
    test_signal_time_deviation()