File: SimpleSiggen.py

package info (click to toggle)
soapysdr 0.8.1-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 948 kB
  • sloc: cpp: 5,378; ansic: 471; python: 311; sh: 21; makefile: 18
file content (158 lines) | stat: -rwxr-xr-x 5,005 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python
"""Simple signal generator for testing transmit

Continuously output a carrier with single sideband sinusoid amplitude
modulation.

Terminate with cntl-C.
"""

import argparse
import math
import signal
import time

import numpy as np

import SoapySDR
from SoapySDR import * #SOAPY_SDR_ constants

import soapy_log_handle

def siggen_app(
        args,
        rate,
        ampl=0.7,
        freq=None,
        tx_bw=None,
        tx_chan=0,
        tx_gain=None,
        tx_ant=None,
        clock_rate=None,
        wave_freq=None
):
    """Generate signal until an interrupt signal is received."""

    if wave_freq is None:
        wave_freq = rate / 10

    sdr = SoapySDR.Device(args)
    #set clock rate first
    if clock_rate is not None:
        sdr.setMasterclock_rate(clock_rate)

    #set sample rate
    sdr.setSampleRate(SOAPY_SDR_TX, tx_chan, rate)
    print("Actual Tx Rate %f Msps"%(sdr.getSampleRate(SOAPY_SDR_TX, tx_chan) / 1e6))

    #set bandwidth
    if tx_bw is not None:
        sdr.setBandwidth(SOAPY_SDR_TX, tx_chan, tx_bw)

    #set antenna
    print("Set the antenna")
    if tx_ant is not None:
        sdr.setAntenna(SOAPY_SDR_TX, tx_chan, tx_ant)

    #set overall gain
    print("Set the gain")
    if tx_gain is not None:
        sdr.setGain(SOAPY_SDR_TX, tx_chan, tx_gain)

    #tune frontends
    print("Tune the frontend")
    if freq is not None:
        sdr.setFrequency(SOAPY_SDR_TX, tx_chan, freq)


    print("Create Tx stream")
    tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [tx_chan])
    print("Activate Tx Stream")
    sdr.activateStream(tx_stream)
    phase_acc = 0
    phase_inc = 2*math.pi*wave_freq/rate
    stream_mtu = sdr.getStreamMTU(tx_stream)
    samps_chan = np.array([ampl]*stream_mtu, np.complex64)

    time_last_print = time.time()
    total_samps = 0

    state = dict(running=True)

    def signal_handler(signum, _):
        print('Signal handler called with signal {}'.format(signum))
        state['running'] = False

    signal.signal(signal.SIGINT, signal_handler)

    while state['running']:
        phase_acc_next = phase_acc + stream_mtu*phase_inc
        phases = np.linspace(phase_acc, phase_acc_next, stream_mtu)
        samps_chan = ampl*np.exp(1j * phases).astype(np.complex64)
        phase_acc = phase_acc_next
        while phase_acc > math.pi * 2:
            phase_acc -= math.pi * 2

        status = sdr.writeStream(tx_stream, [samps_chan], samps_chan.size, timeoutUs=1000000)
        if status.ret != samps_chan.size:
            raise Exception("Expected writeStream() to consume all samples! %d" % status.ret)
        total_samps += status.ret

        if time.time() > time_last_print + 5.0:
            rate = total_samps / (time.time() - time_last_print) / 1e6
            print("Python siggen rate: %f Msps" % rate)
            total_samps = 0
            time_last_print = time.time()

    #cleanup streams
    print("Cleanup stream")
    sdr.deactivateStream(tx_stream)
    sdr.closeStream(tx_stream)
    print("Done!")

def main():
    """Parse command line arguments and start sig-gen."""
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter)

    parser.add_argument("--args", type=str, help="device factor arguments", default="")
    parser.add_argument("--rate", type=float, help="Tx and Rx sample rate", default=1e6)
    parser.add_argument("--ampl", type=float, help="Tx digital amplitude rate", default=0.7)
    parser.add_argument("--tx-ant", type=str, help="Optional Tx antenna")
    parser.add_argument("--tx-gain", type=float, help="Optional Tx gain (dB)")
    parser.add_argument("--tx-chan", type=int, help="Transmitter channel (def=0)", default=0)
    parser.add_argument("--freq", type=float, help="Optional Tx and Rx freq (Hz)")
    parser.add_argument("--tx-bw", type=float, help="Optional Tx filter bw (Hz)", default=5e6)
    parser.add_argument("--wave-freq", type=float, help="Baseband waveform freq (Hz)")
    parser.add_argument("--clock-rate", type=float, help="Optional clock rate (Hz)")
    parser.add_argument("--debug", action='store_true', help="Output debug messages")
    parser.add_argument(
        "--abort-on-error", action='store_true',
        help="Halts operations if the SDR logs an error")

    options = parser.parse_args()

    if options.abort_on_error:
        exception_level = SOAPY_SDR_WARNING
    else:
        exception_level = None
    soapy_log_handle.set_python_log_handler(exception_level=exception_level)
    if options.debug:
        SoapySDR.setLogLevel(SOAPY_SDR_DEBUG)

    siggen_app(
        args=options.args,
        rate=options.rate,
        ampl=options.ampl,
        freq=options.freq,
        tx_bw=options.tx_bw,
        tx_ant=options.tx_ant,
        tx_gain=options.tx_gain,
        tx_chan=options.tx_chan,
        clock_rate=options.clock_rate,
        wave_freq=options.wave_freq,
    )

if __name__ == '__main__':
    main()