1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
#!/usr/bin/env python2
import numpy as np
import time
import copy
import logging
def setup_phase_alignment_parser(parser):
test_group = parser.add_argument_group(
'Phase alignment specific arguments')
test_group.add_argument(
'--runs',
default=10,
type=int,
help='Number of times to retune and measure d_phi')
test_group.add_argument(
'--duration',
default=5.0,
type=float,
help='Duration of a measurement run')
test_group.add_argument(
'--measurement-setup',
type=str,
help='Comma-seperated list of channel ids. Phase difference will be calculated between consecutive channels. default=(0,1,2,..,M-1) M: num_chan'
)
test_group.add_argument(
'--log-level',
type=str,
choices=["critical", "error", "warning", "info", "debug"],
default="info")
test_group.add_argument(
'--freq-bands',
type=int,
help="Number of frequency bands in daughterboard range to randomly retune to",
default=1)
return parser
def setup_tx_phase_alignment_parser(parser):
tx_group = parser.add_argument_group(
'TX Phase alignment specific arguments.')
tx_group.add_argument(
'--tx-channels', type=str, help='which channels to use')
tx_group.add_argument(
'--tx-antenna',
type=str,
help='comma-separated list of channel antennas for tx')
tx_group.add_argument(
'--tx-offset',
type=float,
help='frequency offset in Hz which should be added to center frequency for transmission'
)
return parser
def setup_rts_phase_alignment_parser(parser):
rts_group = parser.add_argument_group(
'RTS Phase alignment specific arguments')
rts_group.add_argument(
'-pd',
'--phasedev',
type=float,
default=1.0,
help='maximum phase standard deviation of dphi in a run which is considered settled (in deg)'
)
rts_group.add_argument(
'-dp',
'--dphi',
type=float,
default=2.0,
help='maximum allowed d_phase deviation between runs (in deg)')
rts_group.add_argument(
'--freqlist',
type=str,
help='comma-separated list of frequencies to test')
rts_group.add_argument(
'--lv-host',
type=str,
help='specify this argument if running tests with vst/switch')
rts_group.add_argument('--lv-vst-name', type=str, help='vst device name')
rts_group.add_argument(
'--lv-switch-name', type=str, help='executive switch name')
rts_group.add_argument(
'--lv-basepath',
type=str,
help='basepath for LabVIEW VIs on Windows')
rts_group.add_argument(
'--tx-offset',
type=float,
help='transmitter frequency offset in VST')
rts_group.add_argument(
'--lv-switch-ports', type=str, help='comma-separated switch-port pair')
return parser
def setup_manual_phase_alignment_parser(parser):
manual_group = parser.add_argument_group(
'Manual Phase alignment specific arguments')
manual_group.add_argument(
'--plot',
dest='plot',
action='store_true',
help='Set this argument to enable plotting results with matplotlib'
)
manual_group.add_argument(
'--auto',
action='store_true',
help='Set this argument to enable automatic selection of test frequencies'
)
manual_group.add_argument(
'--start-freq',
type=float,
default=0.0,
help='Start frequency for automatic selection'
),
manual_group.add_argument(
'--stop-freq',
type=float,
default=0.0,
help='Stop frequency for automatic selection')
parser.set_defaults(plot=False,auto=False)
return parser
def process_measurement_sinks(top_block):
data = list()
curr_data = dict()
for num, chan in enumerate(top_block.measurement_channels[:-1]):
curr_data['avg'] = list(top_block.measurement_sink[num].get_avg())
curr_data['stddev'] = list(top_block.measurement_sink[num].get_stddev(
))
curr_data['first'] = top_block.measurement_channels_names[num]
curr_data['second'] = top_block.measurement_channels_names[num + 1]
data.append(copy.copy(curr_data))
return data
def run_test(top_block, ntimes):
results = dict()
num_sinks = len(top_block.measurement_sink)
for i in xrange(ntimes):
#tune frequency to random position and back to specified frequency
top_block.retune_frequency(bands=top_block.uhd_app.args.freq_bands,band_num=i+1)
time.sleep(2)
#trigger start in all measurement_sinks
for sink in top_block.measurement_sink:
sink.start_run()
#wait until every measurement_sink is ready with the current run
while (sum([ms.get_run() for ms in top_block.measurement_sink]) < (
(i + 1) * num_sinks)):
time.sleep(1)
results = process_measurement_sinks(top_block)
return results
def log_level(string):
return getattr(logging, string.upper())
|