1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237
|
"""
mbed SDK
Copyright (c) 2011-2016 ARM Limited
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
"""
import json
import os
import mbed_lstools
from time import sleep
from mbed_host_tests import DEFAULT_BAUD_RATE
import mbed_host_tests.host_tests_plugins as ht_plugins
from mbed_host_tests.host_tests_logger import HtrunLogger
class Mbed:
"""! Base class for a host driven test
@details This class stores information about things like disk, port, serial speed etc.
Class is also responsible for manipulation of serial port between host and mbed device
"""
def __init__(self, options):
""" ctor
"""
# For compatibility with old mbed. We can use command line options for Mbed object
# or we can pass options directly from .
self.options = options
self.logger = HtrunLogger('MBED')
# Options related to copy / reset mbed device
self.port = self.options.port
self.disk = self.options.disk
self.target_id = self.options.target_id
self.image_path = self.options.image_path.strip('"') if self.options.image_path is not None else ''
self.copy_method = self.options.copy_method
self.retry_copy = self.options.retry_copy
self.program_cycle_s = float(self.options.program_cycle_s if self.options.program_cycle_s is not None else 2.0)
self.polling_timeout = self.options.polling_timeout
# Serial port settings
self.serial_baud = DEFAULT_BAUD_RATE
self.serial_timeout = 1
# Users can use command to pass port speeds together with port name. E.g. COM4:115200:1
# Format if PORT:SPEED:TIMEOUT
port_config = self.port.split(':') if self.port else ''
if len(port_config) == 2:
# -p COM4:115200
self.port = port_config[0]
self.serial_baud = int(port_config[1])
elif len(port_config) == 3:
# -p COM4:115200:0.5
self.port = port_config[0]
self.serial_baud = int(port_config[1])
self.serial_timeout = float(port_config[2])
# Overriding baud rate value with command line specified value
self.serial_baud = self.options.baud_rate if self.options.baud_rate else self.serial_baud
# Test configuration in JSON format
self.test_cfg = None
if self.options.json_test_configuration is not None:
# We need to normalize path before we open file
json_test_configuration_path = self.options.json_test_configuration.strip("\"'")
try:
self.logger.prn_inf("Loading test configuration from '%s'..." % json_test_configuration_path)
with open(json_test_configuration_path) as data_file:
self.test_cfg = json.load(data_file)
except IOError as e:
self.logger.prn_err("Test configuration JSON file '{0}' I/O error({1}): {2}"
.format(json_test_configuration_path, e.errno, e.strerror))
except:
self.logger.prn_err("Test configuration JSON Unexpected error:", str(e))
raise
def copy_image(self, image_path=None, disk=None, copy_method=None, port=None, retry_copy=5):
"""! Closure for copy_image_raw() method.
@return Returns result from copy plugin
"""
def get_remount_count(disk_path, tries=2):
"""! Get the remount count from 'DETAILS.TXT' file
@return Returns count, None if not-available
"""
for cur_try in range(1, tries + 1):
try:
files_on_disk = [x.upper() for x in os.listdir(disk_path)]
if 'DETAILS.TXT' in files_on_disk:
with open(os.path.join(disk_path, 'DETAILS.TXT'), 'r') as details_txt:
for line in details_txt.readlines():
if 'Remount count:' in line:
return int(line.replace('Remount count: ', ''))
# Remount count not found in file
return None
# 'DETAILS.TXT file not found
else:
return None
except OSError as e:
self.logger.prn_err("Failed to get remount count due to OSError.", str(e))
self.logger.prn_inf("Retrying in 1 second (try %s of %s)" % (cur_try, tries))
sleep(1)
# Failed to get remount count
return None
def check_flash_error(target_id, disk, initial_remount_count):
"""! Check for flash errors
@return Returns false if FAIL.TXT present, else true
"""
if not target_id:
self.logger.prn_wrn("Target ID not found: Skipping flash check and retry")
return True
bad_files = set(['FAIL.TXT'])
# Re-try at max 5 times with 0.5 sec in delay
for i in range(5):
# mbed_lstools.create() should be done inside the loop. Otherwise it will loop on same data.
mbeds = mbed_lstools.create()
mbed_list = mbeds.list_mbeds() #list of mbeds present
# get first item in list with a matching target_id, if present
mbed_target = next((x for x in mbed_list if x['target_id']==target_id), None)
if mbed_target is not None:
if 'mount_point' in mbed_target and mbed_target['mount_point'] is not None:
if not initial_remount_count is None:
new_remount_count = get_remount_count(disk)
if not new_remount_count is None and new_remount_count == initial_remount_count:
sleep(0.5)
continue
common_items = []
try:
items = set([x.upper() for x in os.listdir(mbed_target['mount_point'])])
common_items = bad_files.intersection(items)
except OSError as e:
print("Failed to enumerate disk files, retrying")
continue
for common_item in common_items:
full_path = os.path.join(mbed_target['mount_point'], common_item)
self.logger.prn_err("Found %s"% (full_path))
bad_file_contents = "[failed to read bad file]"
try:
with open(full_path, "r") as bad_file:
bad_file_contents = bad_file.read()
except IOError as error:
self.logger.prn_err("Error opening '%s': %s" % (full_path, error))
self.logger.prn_err("Error file contents:\n%s" % bad_file_contents)
if common_items:
return False
sleep(0.5)
return True
# Set-up closure environment
if not image_path:
image_path = self.image_path
if not disk:
disk = self.disk
if not copy_method:
copy_method = self.copy_method
if not port:
port = self.port
if not retry_copy:
retry_copy = self.retry_copy
target_id = self.target_id
if not image_path:
self.logger.prn_err("Error: image path not specified")
return False
if not os.path.isfile(image_path):
self.logger.prn_err("Error: image file (%s) not found" % image_path)
return False
for count in range(0, retry_copy):
initial_remount_count = get_remount_count(disk)
# Call proper copy method
result = self.copy_image_raw(image_path, disk, copy_method, port)
sleep(self.program_cycle_s)
if not result:
continue
result = check_flash_error(target_id, disk, initial_remount_count)
if result:
break
return result
def copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None):
"""! Copy file depending on method you want to use. Handles exception
and return code from shell copy commands.
@return Returns result from copy plugin
@details Method which is actually copying image to mbed
"""
# image_path - Where is binary with target's firmware
# Select copy_method
# We override 'default' method with 'shell' method
copy_method = {
None : 'shell',
'default' : 'shell',
}.get(copy_method, copy_method)
result = ht_plugins.call_plugin('CopyMethod',
copy_method,
image_path=image_path,
serial=port,
destination_disk=disk,
target_id=self.target_id,
pooling_timeout=self.polling_timeout)
return result
def hw_reset(self):
"""
Performs hardware reset of target ned device.
:return:
"""
device_info = {}
result = ht_plugins.call_plugin('ResetMethod',
'power_cycle',
target_id=self.target_id,
device_info=device_info)
if result:
self.port = device_info['serial_port']
self.disk = device_info['mount_point']
return result
|