File: rx_multi_spc_timed_commands_test.py

package info (click to toggle)
uhd 4.9.0.0%2Bds1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 184,180 kB
  • sloc: cpp: 262,887; python: 112,011; ansic: 102,670; vhdl: 57,031; tcl: 19,924; xml: 8,581; makefile: 3,028; sh: 2,812; pascal: 230; javascript: 120; csh: 94; asm: 20; perl: 11
file content (92 lines) | stat: -rw-r--r-- 3,771 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
#
# Copyright 2022 Ettus Research, a National Instruments Brand
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""Tests RX multi-spc timed commands.

Tests by generating samples from USRP with a delay that is not a multiple
of the spc and makes sure the tick count does not get rounded down to the nearest multiple of spc.
"""
import numpy as np
import uhd
from uhd_test_base import UHDPythonTestCase


class RxMultiSpcTimedCommandsTest(UHDPythonTestCase):
    """Run RxMultiSpcTimedCommandsTest."""

    test_name = "RxMultiSpcTimedCommandsTest"

    def run_test(self, test_name, test_args):
        """Run test and report results."""
        rate = test_args.get("rate", 1e6)
        channel = test_args.get("channel", 0)
        delay = test_args.get("delay", 0.5)
        spp = test_args.get("spp", 1024)
        # test_spc is the number of samples per cycle to test. We want
        # to use the maximum supported spc on any device, so that we
        # check that no errors slipped in for any timestamp modulo spc.
        test_spc = test_args.get("test_spc", 4)

        usrp = uhd.usrp.MultiUSRP(self.args_str)
        usrp.set_rx_rate(rate)
        st_args = uhd.usrp.StreamArgs("fc32", "sc16")
        st_args.channels = [channel]
        rx_streamer = usrp.get_rx_stream(st_args)

        run_results = {"passed": True}

        for timekeeper_offset in range(test_spc):
            usrp.set_time_now(uhd.types.TimeSpec(timekeeper_offset / rate))
            for sample_offset in range(test_spc):
                if not run_results["passed"]:
                    continue

                # Calculate a future time in ticks that has the needed alignment
                ticks = usrp.get_time_now().to_ticks(rate)
                ticks = ticks + uhd.types.TimeSpec(delay).to_ticks(rate)
                ticks = (ticks // test_spc) * test_spc
                ticks = ticks + sample_offset

                # Set up stream command for the first packet
                stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.num_done)
                stream_cmd.num_samps = spp
                stream_cmd.stream_now = False
                stream_cmd.time_spec = uhd.types.TimeSpec.from_ticks(ticks, rate)

                rx_streamer.issue_stream_cmd(stream_cmd)

                iq = np.empty(spp, dtype=np.complex64)
                rx_md = uhd.types.RXMetadata()
                # timeout (delay before recv + recv time + padding)
                timeout = delay + spp / rate + 0.5
                rx_streamer.recv(iq, rx_md, timeout=timeout)

                # If multi-spc is not working, the tick count of the sample returned will be
                # rounded down to the nearest multiple of spc
                if rx_md.time_spec.get_tick_count(rate) != stream_cmd.time_spec.get_tick_count(
                    rate
                ):
                    self.log.error("Actual packet received time does not match requested time.")
                    run_results["passed"] = False

                if not run_results["passed"]:
                    self.log.error(
                        f"Test Failed.\n"
                        f"Timekeeper offset: {timekeeper_offset}, "
                        f"Sample offset: {sample_offset}, "
                        f"Sample rate: {rate}"
                    )

        for key in sorted(run_results):
            self.log.info("%s = %s", str(key), str(run_results[key]))
            self.report_result("python_spc_rx_tester", key, run_results[key])
        if "passed" in run_results:
            self.report_result(
                "python_spc_rx_tester",
                "status",
                "Passed" if run_results["passed"] else "Failed",
            )
        return run_results