1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
|
#!/usr/bin/env python3
#
# Copyright 2018 Ettus Research, a National Instruments Company
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
"""
Embedded USRP filesystem update utility
"""
import argparse
import os
import subprocess
import sys
import tempfile
import time
import requests
MPM_DEVICE = None # This is the default value
try:
from usrp_mpm import __mpm_device__ as MPM_DEVICE
except ImportError:
print("ERROR: Could not import MPM.")
DEFAULT_IMAGES_INSTALL_LOCATION = "/usr/share/uhd/images/"
DEFAULT_REMOTE_MANIFEST_URL = (
"https://raw.githubusercontent.com/EttusResearch/uhd/{tag}/images/manifest.txt"
)
DEFAULT_FS_IMAGE = {
"e310_sg1": "usrp_e310_fs.mender",
"e310_sg3": "usrp_e310_fs.mender",
"e320": "usrp_e320_fs.mender",
"n3xx": "usrp_n3xx_fs.mender",
"x4xx": "usrp_x4xx_fs.mender",
}
DEFAULT_MENDER_TARGET = {
"e310_sg1": "e3xx_e310_sg1_mender_default",
"e310_sg3": "e3xx_e310_sg3_mender_default",
"e320": "e3xx_e320_mender_default",
"n3xx": "n3xx_common_mender_default",
"x4xx": "x4xx_common_mender_default",
}
def e31x_get_device_type():
"""get the device type (including speed grade) for E31x devices by reading
pid from eeprom"""
assert MPM_DEVICE == "e31x"
from usrp_mpm.e31x_legacy_eeprom import MboardEEPROM, read_eeprom
eeprom_path = "/sys/devices/soc0/amba/e0004000.i2c/i2c-0/0-0051/0-00510/nvmem"
data = read_eeprom(
True, eeprom_path, 0, MboardEEPROM.eeprom_header_format, MboardEEPROM.eeprom_header_keys
)
pid = data[0]["pid"]
pids = {0x77D2: "e310_sg1", 0x77D3: "e310_sg3"} # sg1 # sg3
if pid not in pids:
raise RuntimeError(f"Unknown E31x product ID: 0x{pid:04X}")
return pids[pid]
def parse_args():
"""Parse args"""
parser = argparse.ArgumentParser(
description="""USRP filesystem update
If run without any options, it will reset the filesystem to the state
it was originally in, but with the same version ("factory reset").
By using -m or -t, the precise version of the new filesystem can be
selected. -t master will update to the very latest filesystem image.
Most options require access to the internet to download the latest
manifest and/or filesystem image.
""",
)
parser.add_argument(
"--image",
help="Binary image of the filesystem update. If this is given, all "
"other options are ignored. Can be a file or a URL",
)
parser.add_argument(
"-y",
"--yes",
action="store_true",
help="Answer questions with yes",
)
parser.add_argument(
"-m",
"--manifest",
help="Manifest source. Can be a file or a URL. Overwrites -t",
)
parser.add_argument(
"-d",
"--device-type",
default=MPM_DEVICE,
required=MPM_DEVICE is None,
choices=list(DEFAULT_MENDER_TARGET.keys()),
help="Specify/overwrite device type " "(DANGER! May brick device if used incorrectly!)",
)
parser.add_argument(
"-t",
"--git-tag",
help="Will try and locate the given git tag or branch and get the "
"corresponding manifest. Ignored when -m is given. Using this "
"requires internet access.",
)
args = parser.parse_args()
if args.device_type == "e31x":
args.device_type = e31x_get_device_type()
return args
def download_image(device_type, manifest_path=None):
"""
Run uhd_images_downloader to fetch the mender image
"""
downloader_command = [
"python3",
"/usr/bin/uhd_images_downloader",
"-t",
DEFAULT_MENDER_TARGET[device_type],
"-i",
DEFAULT_IMAGES_INSTALL_LOCATION,
]
if manifest_path is not None:
downloader_command = downloader_command + ["-m", manifest_path]
subprocess.check_call(downloader_command)
def prepare_manifest(manifest):
"""Create a valid local path for a manifest"""
manifest = manifest.strip()
if os.path.isfile(manifest):
return manifest
if manifest.strip().startswith("http"):
manifest = manifest.strip()
temp_dir = tempfile.mkdtemp()
print("Downloading manifest file from {}...".format(manifest.strip()))
req_obj = requests.get(manifest)
manifest_path = os.path.join(temp_dir, "manifest.txt")
open(manifest_path, "wb").write(req_obj.content)
# This will leave a stray file in /tmp but that's OK... we're blowing
# away the FS anyway
return manifest_path
raise RuntimeError("Unknown manifest location type: " + manifest)
def get_manifest_from_tag(git_tag):
"""Turn a git tag or branch into a URL to a valid manifest"""
manifest_url = DEFAULT_REMOTE_MANIFEST_URL.format(tag=git_tag)
return prepare_manifest(manifest_url)
def prepare_image(device_type, args):
"""Figure out which mender image to use, and make sure it's available.
Returns the path to a valid Mender image.
"""
if args.image is not None:
print("Using image for upgrade: {}".format(args.image))
# This is the only case where we don't invoke the images downloader
return args.image
manifest_path = None
if args.manifest is not None:
print("Using manifest to download image: {}".format(args.manifest))
manifest_path = prepare_manifest(args.manifest)
elif args.git_tag:
manifest_path = get_manifest_from_tag(args.git_tag)
else:
print("Using default manifest.")
# Note: The image downloader may choose to do nothing if the requested
# image is already downloaded. It's safe to call anyway.
download_image(device_type, manifest_path)
return os.path.join(DEFAULT_IMAGES_INSTALL_LOCATION, DEFAULT_FS_IMAGE[device_type])
def apply_image(mender_image):
"""Actually run mender to inject the mender_image"""
subprocess.check_call(["mender", "install", mender_image])
def reboot():
"""
Reboot
"""
print("Will reboot now. Hit Ctrl-C before the countdown expires to cancel.")
print("Rebooting in 3...")
time.sleep(1)
print("2...")
time.sleep(1)
print("1...")
time.sleep(1)
os.system("reboot")
def run():
"""Main execution"""
args = parse_args()
mender_image = prepare_image(args.device_type, args)
if mender_image is None:
print("Error: Unable to identify filesystem image!")
return False
apply_image(mender_image)
print(
"Applied image. After reboot, check if everything works, and then "
"run the command '$ mender commit' to confirm (otherwise, this "
"update will be undone)."
)
print("Note: Any data stored in this partition will be not accessible " "after reboot.")
if args.yes:
reboot_now = "y"
else:
reboot_now = input("Reboot now? [Yn] ")
if reboot_now == "" or reboot_now.lower() == "y":
try:
reboot() # This will implicitly terminate the program, but it looks
# neater if we have the return statement here
return True
except KeyboardInterrupt:
pass
print("Skipping reboot. Your device will load the updated file system " "on the next boot.")
return True
def main():
"""Go, go, go!"""
try:
return run()
except SystemExit: # for argparse's --help
return True
except BaseException as ex:
print("Error: Unexpected exception caught!")
print(str(ex))
return False
if __name__ == "__main__":
sys.exit(not main())
|