1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
|
#
# Copyright 2017 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Aurora SFP control
"""
import math
import time
from builtins import str
from builtins import object
from usrp_mpm.mpmlog import get_logger
def mean(vals):
" Calculate arithmetic mean of vals "
return float(sum(vals)) / max(len(vals), 1)
def stddev(vals, mu=None):
" Calculate std deviation of vals "
mu = mu or mean(vals)
return float(sum(((x - mu)**2 for x in vals))) / max(len(vals), 1)
class AuroraControl(object):
"""
Controls an Aurora core.
"""
# These are relative addresses
REG_AURORA_PORT_INFO = 0x00
REG_AURORA_MAC_CTRL_STATUS = 0x04
REG_AURORA_PHY_CTRL_STATUS = 0x08
REG_AURORA_OVERRUNS = 0x20
REG_CHECKSUM_ERRORS = 0x24
REG_BIST_CHECKER_SAMPS = 0x28
REG_BIST_CHECKER_ERRORS = 0x2C
MAC_STATUS_LINK_UP_MSK = 0x00000001
MAC_STATUS_HARD_ERR_MSK = 0x00000002
MAC_STATUS_SOFT_ERR_MSK = 0x00000004
MAC_STATUS_BIST_LOCKED_MSK = 0x00000008
MAC_STATUS_BIST_LATENCY_MSK = 0x000FFFF0
MAC_STATUS_BIST_LATENCY_OFFSET = 4
RATE_RES_BITS = 6
DEFAULT_BUS_CLK_RATE = 200e6
def __init__(self, peeker_poker32, base_addr=None, bus_clk_rate=None):
assert hasattr(peeker_poker32, 'peek32') \
and callable(peeker_poker32.peek32)
assert hasattr(peeker_poker32, 'poke32') \
and callable(peeker_poker32.poke32)
self.log = get_logger("AuroraCore")
self._regs = peeker_poker32
base_addr = base_addr or 0
self.log.debug("Base address in register space is: 0x{:04X}".format(
base_addr
))
self.poke32 = lambda addr, data: self._regs.poke32(
addr + base_addr, data
)
self.peek32 = lambda addr: self._regs.peek32(addr + base_addr)
self.mac_ctrl = 0x000
self.set_mac_ctrl(self.mac_ctrl)
time.sleep(.5)
self.bus_clk_rate = bus_clk_rate
if self.bus_clk_rate is None:
self.bus_clk_rate = self.DEFAULT_BUS_CLK_RATE
self.log.warning("Unspecified bus clock rate. Assuming default "
"rate of {} MHz.".format(self.bus_clk_rate/1e6))
else:
self.log.debug("Bus clock rate: {} MHz.".format(
self.bus_clk_rate/1e6
))
self.bist_max_time_limit = math.floor(2**48/self.bus_clk_rate)-1
self.log.debug("BIST max time limit: {} s".format(
self.bist_max_time_limit
))
self.log.debug("Status of PHY link: 0x{:08X}".format(
self.read_phy_ctrl_status()
))
if not self.is_phy_link_up():
raise RuntimeError("PHY link not up. Check connectors.")
def read_mac_ctrl_status(self):
" Return MAC ctrl status word from core "
return self.peek32(self.REG_AURORA_MAC_CTRL_STATUS)
def read_phy_ctrl_status(self):
" Return PHY ctrl status word from core "
return self.peek32(self.REG_AURORA_PHY_CTRL_STATUS)
def read_overruns(self):
" Return overrun count from core "
return self.peek32(self.REG_AURORA_OVERRUNS)
def read_checksum_errors(self):
" Return checksum error count from core "
return self.peek32(self.REG_CHECKSUM_ERRORS)
def read_bist_checker_samps(self):
" Return number of samps processed from core "
return self.peek32(self.REG_BIST_CHECKER_SAMPS)
def read_bist_checker_errors(self):
" Return number of errors from core "
return self.peek32(self.REG_BIST_CHECKER_ERRORS)
def set_mac_ctrl(self, mac_ctrl_word):
" Write to the MAC ctrl register "
self.log.debug("Setting MAC ctrl word to: 0x{:08X}".format(mac_ctrl_word))
self.poke32(self.REG_AURORA_MAC_CTRL_STATUS, mac_ctrl_word)
def set_bist_checker_and_gen(self, enable):
" Enable or disable Aurora BIST: Checker + Generator "
if enable:
self.log.info("Enable Aurora BIST Checker and Gen")
self.mac_ctrl = self.mac_ctrl | 0b11
else:
self.log.info("Disable Aurora BIST Checker and Gen")
self.mac_ctrl = self.mac_ctrl & 0xFFFFFFFC
self.set_mac_ctrl(self.mac_ctrl)
def set_bist_checker(self, enable):
" Enable or disable Aurora BIST: Checker only "
if enable:
self.log.info("Enable Aurora BIST Checker")
self.mac_ctrl = self.mac_ctrl | 0b01
else:
self.log.info("Disable Aurora BIST Checker")
self.mac_ctrl = self.mac_ctrl & 0xFFFFFFFE
self.set_mac_ctrl(self.mac_ctrl)
def set_bist_gen(self, enable):
" Enable or disable Aurora BIST: Generator only "
if enable:
self.log.info("Enable Aurora BIST Gen")
self.mac_ctrl = self.mac_ctrl | 0b10
else:
self.log.info("Disable Aurora BIST Gen")
self.mac_ctrl = self.mac_ctrl & 0xFFFFFFFD
self.set_mac_ctrl(self.mac_ctrl)
def set_loopback(self, enable):
" Enable or disable Aurora loopback mode "
if enable:
self.log.info("Enable Aurora loopback")
self.mac_ctrl = self.mac_ctrl | 0b100
else:
self.log.info("Disable Aurora loopback")
self.mac_ctrl = self.mac_ctrl & 0xFFFFFFFB
self.set_mac_ctrl(self.mac_ctrl)
def set_bist_rate(self, rate_word):
" Set BIST rate. It's a 6-bit value in the MAC ctrl register. "
self.log.debug("Setting Aurora BIST rate word to 0x{:02X}".format(
rate_word
))
self.mac_ctrl = self.mac_ctrl | ((rate_word & 0x3F) << 3)
self.set_mac_ctrl(self.mac_ctrl)
def reset_phy(self):
" Reset Aurora PHY "
self.log.debug("Reset Aurora PHY")
self.mac_ctrl = self.mac_ctrl | (1<<9)
self.set_mac_ctrl(self.mac_ctrl)
self.clear_control_reg()
def clear_mac(self):
" Clear Aurora MAC "
self.log.debug("Clear Aurora MAC")
self.mac_ctrl = self.mac_ctrl | (1<<10)
self.set_mac_ctrl(self.mac_ctrl)
self.clear_control_reg()
def clear_control_reg(self):
" Zero out Aurora control register "
self.mac_ctrl = 0
self.set_mac_ctrl(self.mac_ctrl)
def get_rate_setting(self, requested_rate, bus_clk_rate):
"""
From a requested bit rate, return the value for the rate register, and
the coerced value.
"""
max_rate_word = 2**self.RATE_RES_BITS - 1
lines_per_clock = float(requested_rate) / 64 / bus_clk_rate
rate_word = int(lines_per_clock * 2**self.RATE_RES_BITS) - 1
rate_word = min(max(rate_word, 0), max_rate_word)
coerced_rate = ((1+rate_word) / 2**self.RATE_RES_BITS) \
* 64 * bus_clk_rate
return rate_word, coerced_rate
def is_phy_link_up(self):
"""
Return True if the PHY link was successfully negotiated.
"""
return bool(self.read_phy_ctrl_status() & 0x1)
def reset_core(self):
" Reset MAC. PHY reset not necessary"
self.clear_control_reg()
self.clear_mac()
def run_latency_loopback_bist(
self,
duration,
requested_rate,
slave=None,
):
"""
Run latency loopback BIST
slave -- the other sfp core gets set to loopback mode
ctrl -- sorta the master sfp core
duration -- time we want to run the bist
requested_rate -- Requested BIST rate in bits/s
"""
rate_word, coerced_rate = \
self.get_rate_setting(requested_rate, self.bus_clk_rate)
self.log.info(
'Running Latency Loopback BIST at %.0fMB/s for %.0fs...',
coerced_rate/8e6, duration
)
self._pre_test_init(slave)
start_time = time.time()
results = {
'latencies': [],
'mst_lock_errors': 0,
'mst_hard_errors': 0,
'mst_overruns': 0,
}
try:
for _ in range(duration*10):
self.set_bist_rate(rate_word)
self.set_bist_checker_and_gen(enable=True)
# Wait and check if BIST locked
time.sleep(0.05)
mst_status = self.read_mac_ctrl_status()
if not mst_status & self.MAC_STATUS_BIST_LOCKED_MSK:
results['mst_lock_errors'] += 1
self.log.info('lock errors: %d', results['mst_lock_errors'])
# Turn off the BIST generator
self.set_bist_gen(0)
# Validate status and no overruns
mst_status = self.read_mac_ctrl_status()
results['mst_overruns'] = self.read_overruns()
if mst_status & self.MAC_STATUS_HARD_ERR_MSK:
results['mst_hard_errors'] += 1
time.sleep(0.05)
self.clear_control_reg()
# Compute latency
results['latencies'].append(
self._mst_status_to_latency_us(mst_status)
)
except KeyboardInterrupt:
self.log.warning('Operation cancelled by user.')
stop_time = time.time()
# Report
if results['mst_lock_errors'] > 0:
self.log.error(
'BIST engine did not lock onto a PRBS word %d times!',
results['mst_lock_errors']
)
if results['mst_hard_errors'] > 0:
self.log.error(
'There were %d hard errors in master PHY',
results['mst_hard_errors']
)
if results['mst_overruns'] > 0:
self.log.error(
'There were %d buffer overruns in master PHY',
results['mst_overruns']
)
mu_lat = mean(results['latencies'])
results['elapsed_time'] = stop_time - start_time
self.log.info('BIST Complete!')
self.log.info('- Elapsed Time = ' + str(results['elapsed_time']))
self.log.info('- Roundtrip Latency Mean = %.2fus', mu_lat)
self.log.info('- Roundtrip Latency Stdev = %.6fus',
stddev(results['latencies'], mu=mu_lat))
# Turn off BIST loopback
time.sleep(0.5)
if slave is not None:
results['sla_overruns'], results['sla_hard_errors'] = \
self._get_slave_status(slave)
self._post_test_cleanup(slave)
return results
def run_ber_loopback_bist(self, duration, requested_rate, slave=None):
"""
Run BER Bist. Pump lots of bits through, and see how many come back
correctly.
duration -- Time to run the test in seconds
"""
rate_word, coerced_rate = \
self.get_rate_setting(requested_rate, self.bus_clk_rate)
self.log.info('Running BER Loopback BIST at {}MB/s for {}s...'.format(
coerced_rate/8e6, duration
))
self._pre_test_init(slave)
mst_overruns = 0
self.log.info("Starting BER test...")
start_time = time.time()
self.set_bist_rate(rate_word)
self.set_bist_checker_and_gen(enable=True)
# Wait and check if BIST locked
time.sleep(0.5)
mst_status = self.read_mac_ctrl_status()
if not mst_status & self.MAC_STATUS_BIST_LOCKED_MSK:
error_msg = 'BIST engine did not lock onto a PRBS word! ' \
'MAC status word: 0x{:08X}'.format(mst_status)
self.log.error(error_msg)
raise RuntimeError(error_msg)
# Wait for requested time
try:
time.sleep(duration)
except KeyboardInterrupt:
self.log.warning('Operation cancelled by user.')
# Turn off the BIST generator and loopback
self.set_bist_gen(enable=False)
results = {}
results['time_elapsed'] = time.time() - start_time
time.sleep(0.5)
# Validate status and no overruns
mst_status = self.read_mac_ctrl_status()
results['mst_overruns'] = self.read_overruns()
results['mst_samps'] = 65536 * self.read_bist_checker_samps()
results['mst_errors'] = self.read_bist_checker_errors()
if mst_status & self.MAC_STATUS_HARD_ERR_MSK:
self.log.error('Hard errors in master PHY')
results['mst_hard_errors'] = True
if mst_overruns > 0:
self.log.error('Buffer overruns in master PHY')
if slave is not None:
results['sla_overruns'], results['sla_hard_errors'] = \
self._get_slave_status(slave)
if results['mst_samps'] != 0:
results['mst_latency_us'] = \
self._mst_status_to_latency_us(mst_status)
self.log.info('BIST Complete!')
self.log.info('- Elapsed Time = {:.2} s'.format(
results['time_elapsed']
))
results['max_ber'] = \
float(results['mst_errors']+1) / results['mst_samps']
results['approx_throughput'] = \
(8 * results['mst_samps']) / results['time_elapsed']
self.log.info('- Max BER (Bit Error Ratio) = %.4g ' \
'(%d errors out of %d)',
results['max_ber'],
results['mst_errors'],
results['mst_samps'])
self.log.info('- Max Roundtrip Latency = %.1fus',
results['mst_latency_us'])
self.log.info('- Approx Throughput = %.0fMB/s',
results['approx_throughput'] / 1e6)
else:
self.log.error('No samples received -- BIST Failed!')
self._post_test_cleanup(slave)
return results
def _get_slave_status(self, slave):
"""
Read back status from the slave
"""
slave.clear_control_reg()
sla_status = slave.read_mac_ctrl_status()
sla_overruns = slave.read_overruns()
sla_hard_errors = 0
if sla_status & self.MAC_STATUS_HARD_ERR_MSK:
self.log.error('Hard errors in slave PHY')
sla_hard_errors = slave.read_overruns()
if sla_overruns > 0:
self.log.error('Buffer overruns in slave PHY')
return sla_overruns, sla_hard_errors
def _mst_status_to_latency_us(self, mst_status):
" Convert a MAC status word into latency in microseconds "
latency_cyc = 16.0 * \
((mst_status & self.MAC_STATUS_BIST_LATENCY_MSK) \
>> self.MAC_STATUS_BIST_LATENCY_OFFSET)
return 1e6 * latency_cyc / self.bus_clk_rate
def _pre_test_init(self, slave=None):
" Set up core(s) for BISTing "
self.reset_core()
if slave is not None:
slave.reset_core()
time.sleep(1.5)
if slave is not None:
self.set_loopback(enable=False)
slave.set_loopback(enable=True)
self.log.debug("Status of PHY link: 0x{:08X}".format(
self.read_phy_ctrl_status()
))
if not self.is_phy_link_up():
raise RuntimeError("PHY link not up. Check connectors.")
def _post_test_cleanup(self, slave=None):
" Drain and Cleanup "
self.log.info('Cleaning up...')
self.set_bist_checker(enable=True)
if slave is not None:
slave.set_bist_checker(enable=True)
time.sleep(0.5)
self.clear_control_reg()
if slave is not None:
slave.clear_control_reg()
|