1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
|
#
# Copyright 2018 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
MPM Component management
"""
import os
import re
import shutil
import subprocess
from usrp_mpm.rpc_utils import no_rpc
class ZynqComponents:
"""
Mixin class that update Zynq FPGA and devicetree components.
Note: this assumes that the class that you are mixing ZynqComponents into
initializes `updateable_components`, `device_info`, and `log`.
"""
# Declare required members here
updateable_components = {}
device_info = {}
# Note: the logger is created by derived class (the class we are mixing
# into), so all logs in the ZynqComponents class will be under the
# derived class's category.
log = None
###########################################################################
# Component updating
###########################################################################
def _log_and_raise(self, logstr):
self.log.error(logstr)
raise RuntimeError(logstr)
@classmethod
def _parse_dts_mpm_version_tag(cls, text):
""" parse a version line from the dts file. E.g.
"// mpm_version component1 3.4.5" will return
{"component1": (3, 4, 5)} """
dts_version_re = re.compile(r'^// mpm_version\s+(?P<comp>\S+)\s+(?P<ver>\S+)$')
match = dts_version_re.match(text)
if match is None:
return (None, None)
component = match[1]
version_list = [int(x, base=0) for x in match[2].split('.')]
return (component, tuple(version_list))
@classmethod
def _parse_dts_version_info_from_file(cls, filepath):
"""
parse all version informations from dts file and store in dict
a c-style comment in the dts file like this
// mpm_version component1 3.4.5
// mpm_version other_component 3.5.0
will return a dict:
{"component1": (3, 4, 5), "other_component": (3, 5, 0)"}
"""
suffix_current = "_current_version"
suffix_oldest = "_oldest_compatible_version"
version_info = {}
with open(filepath) as f:
text = f.read()
for line in text.splitlines():
component, version = cls._parse_dts_mpm_version_tag(line)
if not component:
continue
if component.endswith(suffix_oldest):
component = component[:-len(suffix_oldest)]
version_type = 'oldest'
elif component.endswith(suffix_current):
component = component[:-len(suffix_current)]
version_type = 'current'
else:
version_type = 'current'
if component not in version_info:
version_info[component] = {}
version_info[component][version_type] = version
return version_info
def _merge_updateable_components(self):
"""
Combines updateable_components from MB and all DBs and returns the
merged result.
"""
def merge_dicts(origd, newd):
"""
https://stackoverflow.com/questions/71396284/python-how-to-recursively-merge-2-dictionaries
"""
for key, val in newd.items():
if key not in origd:
origd[key] = val
continue
if isinstance(val, dict):
if not isinstance(origd[key], dict):
origd[key] = {}
merge_dicts(origd[key], val)
else:
origd[key] = val
return origd
upc = self.updateable_components
for dboard in self.dboards:
upc = merge_dicts(upc, dboard.updateable_components)
return upc
def _verify_compatibility(self, filebasename, update_dict):
"""
check whether the given MPM compatibility matches the
version information stored in the FPGA DTS file
"""
def _get_version_string(versions_enum):
version_strings = []
if 'current' in versions_enum:
version_strings.append("current: {}".format(
".".join([str(x) for x in versions_enum['current']])))
if 'oldest' in versions_enum:
version_strings.append("oldest compatible: {}".format(
".".join([str(x) for x in versions_enum['oldest']])))
return ', '.join(version_strings)
if update_dict.get('check_dts_for_compatibility'):
self.log.trace("Compatibility check MPM <-> FPGA via DTS enabled")
dtsfilepath = filebasename + '.dts'
if not os.path.exists(dtsfilepath):
self.log.warning(f"DTS file not found: {dtsfilepath}, cannot "
f"check version of bitfile without DTS.")
return
self.log.trace(f"Parsing DTS file {dtsfilepath} for version information")
fpga_versions = self._parse_dts_version_info_from_file(dtsfilepath)
if not fpga_versions:
self._log_and_raise("no component version information in DTS file")
if 'compatibility' not in update_dict:
self._log_and_raise("MPM FPGA version infos not found")
mpm_versions = update_dict['compatibility']
self.log.trace(f"DTS version infos: {fpga_versions}")
self.log.trace(f"MPM version infos: {mpm_versions}")
try:
for component in mpm_versions.keys():
# check for components that aren't available in the DTS file
if component in fpga_versions.keys():
self.log.trace(f"check compatibility for: FPGA-{component}")
mpm_version = mpm_versions[component]
fpga_version = fpga_versions[component]
self.log.trace("mpm_version: current: {}, compatible: {}".format(
mpm_version['current'], mpm_version['oldest']))
self.log.trace("fpga_version: current: {}, compatible: {}".format(
fpga_version['current'], fpga_version['oldest']))
if mpm_version['oldest'][0] > fpga_version['current'][0]:
error = "Component {} is too old ({}, MPM version: {})".format(
component,
_get_version_string(fpga_version),
_get_version_string(mpm_version))
self._log_and_raise(error)
elif mpm_version['current'][0] < fpga_version['oldest'][0]:
error = "Component {} is too new ({}, MPM version: {})".format(
component,
_get_version_string(fpga_version),
_get_version_string(mpm_version))
self._log_and_raise(error)
self.log.trace(f"Component {component} is good!")
else:
self.log.warning(f"component {component} defined in "\
f"MPM but not found in FPGA info, skipping.")
except RuntimeError as ex:
self._log_and_raise("MPM compatibility infos suggest that the "\
"new bitfile is not compatible, skipping installation. {}"\
.format(ex))
else:
self.log.trace("Compatibility check MPM <-> FPGA is disabled")
return
@no_rpc
def update_fpga(self, filepath, metadata):
"""
Update the FPGA image in the filesystem and reload the overlay
:param filepath: path to new FPGA image
:param metadata: Dictionary of strings containing metadata
"""
self.log.trace(f"Updating FPGA with image at {filepath}"\
" (metadata: `{str(metadata)}')")
file_name, file_extension = os.path.splitext(filepath)
# Cut off the period from the file extension
file_extension = file_extension[1:].lower()
if file_extension not in ['bit', 'bin']:
self._log_and_raise(f"Invalid FPGA bitfile: {filepath}")
updc = self._merge_updateable_components()
assert 'path' in updc['fpga']
binfile_path = updc['fpga']['path'].format(self.device_info.get('product'))
self._verify_compatibility(file_name, updc['fpga'])
if file_extension == "bit":
self.log.trace(
f"Converting bit to bin file and writing to {binfile_path}")
from usrp_mpm.fpga_bit_to_bin import fpga_bit_to_bin
fpga_bit_to_bin(filepath, binfile_path, flip=True)
elif file_extension == "bin":
self.log.trace("Copying bin file to %s", binfile_path)
shutil.copy(filepath, binfile_path)
# RPC server will reload the periph manager after this.
return True
@no_rpc
def update_dts(self, filepath, metadata):
"""
Update the DTS image in the filesystem
:param filepath: path to new DTS image
:param metadata: Dictionary of strings containing metadata
"""
dtsfile_path = self._merge_updateable_components()['dts']['path'].format(
self.device_info.get('product'))
self.log.trace("Updating DTS with image at %s to %s (metadata: %s)",
filepath, dtsfile_path, str(metadata))
shutil.copy(filepath, dtsfile_path)
dtbofile_path = self._merge_updateable_components()['dts']['output'].format(
self.device_info.get('product'))
self.log.trace("Compiling to %s...", dtbofile_path)
dtc_command = [
'dtc',
'--symbols',
'-O', 'dtb',
'-q', # Suppress warnings
'-o',
dtbofile_path,
dtsfile_path,
]
self.log.trace("Executing command: `$ %s'", " ".join(dtc_command))
try:
out = subprocess.check_output(dtc_command)
if out.strip() != "":
# Keep this as debug because dtc is an external tool and
# something could go wrong with it that's outside of our control
self.log.debug("`dtc' command output: \n%s", out)
except OSError:
self.log.error("Could not execute `dtc' command. Binary probably "
"not installed. Please compile DTS by hand.")
# No fatal error here, in order not to break the current workflow
except subprocess.CalledProcessError as ex:
self.log.error("Error executing `dtc': %s", str(ex))
return False
return True
|