1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
#!/usr/bin/env python
"""Simple signal generator for testing transmit
Continuously output a carrier with single sideband sinusoid amplitude
modulation.
Terminate with cntl-C.
"""
import argparse
import math
import signal
import time
import numpy as np
import SoapySDR
from SoapySDR import * #SOAPY_SDR_ constants
import soapy_log_handle
def siggen_app(
args,
rate,
ampl=0.7,
freq=None,
tx_bw=None,
tx_chan=0,
tx_gain=None,
tx_ant=None,
clock_rate=None,
wave_freq=None
):
"""Generate signal until an interrupt signal is received."""
if wave_freq is None:
wave_freq = rate / 10
sdr = SoapySDR.Device(args)
#set clock rate first
if clock_rate is not None:
sdr.setMasterclock_rate(clock_rate)
#set sample rate
sdr.setSampleRate(SOAPY_SDR_TX, tx_chan, rate)
print("Actual Tx Rate %f Msps"%(sdr.getSampleRate(SOAPY_SDR_TX, tx_chan) / 1e6))
#set bandwidth
if tx_bw is not None:
sdr.setBandwidth(SOAPY_SDR_TX, tx_chan, tx_bw)
#set antenna
print("Set the antenna")
if tx_ant is not None:
sdr.setAntenna(SOAPY_SDR_TX, tx_chan, tx_ant)
#set overall gain
print("Set the gain")
if tx_gain is not None:
sdr.setGain(SOAPY_SDR_TX, tx_chan, tx_gain)
#tune frontends
print("Tune the frontend")
if freq is not None:
sdr.setFrequency(SOAPY_SDR_TX, tx_chan, freq)
print("Create Tx stream")
tx_stream = sdr.setupStream(SOAPY_SDR_TX, SOAPY_SDR_CF32, [tx_chan])
print("Activate Tx Stream")
sdr.activateStream(tx_stream)
phase_acc = 0
phase_inc = 2*math.pi*wave_freq/rate
stream_mtu = sdr.getStreamMTU(tx_stream)
samps_chan = np.array([ampl]*stream_mtu, np.complex64)
time_last_print = time.time()
total_samps = 0
state = dict(running=True)
def signal_handler(signum, _):
print('Signal handler called with signal {}'.format(signum))
state['running'] = False
signal.signal(signal.SIGINT, signal_handler)
while state['running']:
phase_acc_next = phase_acc + stream_mtu*phase_inc
phases = np.linspace(phase_acc, phase_acc_next, stream_mtu)
samps_chan = ampl*np.exp(1j * phases).astype(np.complex64)
phase_acc = phase_acc_next
while phase_acc > math.pi * 2:
phase_acc -= math.pi * 2
status = sdr.writeStream(tx_stream, [samps_chan], samps_chan.size, timeoutUs=1000000)
if status.ret != samps_chan.size:
raise Exception("Expected writeStream() to consume all samples! %d" % status.ret)
total_samps += status.ret
if time.time() > time_last_print + 5.0:
rate = total_samps / (time.time() - time_last_print) / 1e6
print("Python siggen rate: %f Msps" % rate)
total_samps = 0
time_last_print = time.time()
#cleanup streams
print("Cleanup stream")
sdr.deactivateStream(tx_stream)
sdr.closeStream(tx_stream)
print("Done!")
def main():
"""Parse command line arguments and start sig-gen."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--args", type=str, help="device factor arguments", default="")
parser.add_argument("--rate", type=float, help="Tx and Rx sample rate", default=1e6)
parser.add_argument("--ampl", type=float, help="Tx digital amplitude rate", default=0.7)
parser.add_argument("--tx-ant", type=str, help="Optional Tx antenna")
parser.add_argument("--tx-gain", type=float, help="Optional Tx gain (dB)")
parser.add_argument("--tx-chan", type=int, help="Transmitter channel (def=0)", default=0)
parser.add_argument("--freq", type=float, help="Optional Tx and Rx freq (Hz)")
parser.add_argument("--tx-bw", type=float, help="Optional Tx filter bw (Hz)", default=5e6)
parser.add_argument("--wave-freq", type=float, help="Baseband waveform freq (Hz)")
parser.add_argument("--clock-rate", type=float, help="Optional clock rate (Hz)")
parser.add_argument("--debug", action='store_true', help="Output debug messages")
parser.add_argument(
"--abort-on-error", action='store_true',
help="Halts operations if the SDR logs an error")
options = parser.parse_args()
if options.abort_on_error:
exception_level = SOAPY_SDR_WARNING
else:
exception_level = None
soapy_log_handle.set_python_log_handler(exception_level=exception_level)
if options.debug:
SoapySDR.setLogLevel(SOAPY_SDR_DEBUG)
siggen_app(
args=options.args,
rate=options.rate,
ampl=options.ampl,
freq=options.freq,
tx_bw=options.tx_bw,
tx_ant=options.tx_ant,
tx_gain=options.tx_gain,
tx_chan=options.tx_chan,
clock_rate=options.clock_rate,
wave_freq=options.wave_freq,
)
if __name__ == '__main__':
main()
|