1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
#!/usr/bin/env python
from gnuradio import blocks
from gnuradio import gr
from gnuradio import uhd
from usrptest import phase_calc_ccf
from gnuradio.uhd.uhd_app import UHDApp
import numpy as np
class selftest_fg(gr.top_block):
def __init__(self, freq, samp_rate, dphase, devices=list()):
gr.top_block.__init__(self, "Generate Signal extract phase")
##################################################
# Variables
##################################################
self.samp_rate = samp_rate
self.freq = 10e3
self.devices = devices
self.dphase = dphase
self.tx_gain = 50
self.rx_gain = 50
self.center_freq = freq
self.omega = 2*np.pi*self.freq
self.steps = np.arange(
self.samp_rate)*float(self.omega)/float(self.samp_rate)
self.reference = self.complex_sine(self.steps)
self.test_signal = self.complex_sine(self.steps+0.5*np.pi)
self.device_test = False
##################################################
# Block dicts
##################################################
self.rx_devices = dict()
self.tx_devices = dict()
self.sink_dict = dict()
self.phase_dict = dict()
self.reference_source = blocks.vector_source_c(self.reference)
if len(self.devices):
##################################################
# Devices
##################################################
self.device_test = True
#To reuse existing setup_usrp() command
for device in self.devices:
# Create and configure all devices
self.rx_devices[device] = uhd.usrp_source(
device, uhd.stream_args(
cpu_format="fc32", channel=range(1)))
self.tx_devices[device] = uhd.usrp_sink(
device, uhd.stream_args(
cpu_format="fc32", channel=range(1)))
self.rx_devices[device].set_samp_rate(self.samp_rate)
self.rx_devices[device].set_center_freq(self.center_freq, 0)
self.rx_devices[device].set_gain(self.rx_gain, 0)
self.tx_devices[device].set_samp_rate(self.samp_rate)
self.tx_devices[device].set_center_freq(self.center_freq, 0)
self.tx_devices[device].set_gain(self.tx_gain, 0)
self.sink_dict[device] = blocks.vector_sink_f()
self.phase_dict[device] = phase_calc_ccf(
self.samp_rate, self.freq)
for device in self.tx_devices.values():
self.connect((self.reference_source, 0), (device, 0))
for device_key in self.rx_devices.keys():
self.connect(
(self.rx_devices[device_key], 0), (self.phase_dict[device_key], 0))
self.connect((self.reference_source, 0),
(self.phase_dict[device_key], 1))
self.connect(
(self.phase_dict[device_key], 0), (self.sink_dict[device_key], 0))
# Debug options
# self.sink_list.append(blocks.vector_sink_c())
#self.connect((device, 0), (self.sink_list[-1], 0))
# self.sink_list.append(blocks.vector_sink_c())
#self.connect((self.reference_source, 0), (self.sink_list[-1], 0))
else:
##################################################
# Blocks
##################################################
self.result = blocks.vector_sink_f(1)
self.test_source = blocks.vector_source_c(self.test_signal)
self.block_phase_calc = phase_calc_ccf(
self.samp_rate, self.freq)
##################################################
# Connections
##################################################
self.connect((self.reference_source, 0), (self.block_phase_calc, 1))
self.connect((self.test_source, 0), (self.block_phase_calc, 0))
self.connect((self.block_phase_calc, 0), (self.result, 0))
def complex_sine(self, steps):
return np.exp(1j*steps)
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
def run(self):
self.start()
self.wait()
if self.device_test:
data = dict()
for device_key in self.sink_dict.keys():
curr_data = self.sink_dict[device_key].data()
curr_data = curr_data[int(0.2*self.samp_rate):-int(0.2*self.samp_rate)]
phase_avg = np.average(curr_data)
if (np.max(curr_data) < phase_avg+self.dphase*0.5) and (np.min(curr_data) > phase_avg-self.dphase*0.5):
data[device_key] = phase_avg
else:
print("Error phase not settled")
#Debug
# plt.ylim(-1, 1)
# plt.xlim(self.samp_rate/2.), (self.samp_rate/2.)+1000)
#for key in data:
# plt.plot(data[key])
return data
|