File: mbed_base.py

package info (click to toggle)
python-mbed-host-tests 1.4.4-11
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 732 kB
  • sloc: python: 3,141; makefile: 27; sh: 14
file content (237 lines) | stat: -rw-r--r-- 10,461 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
"""
mbed SDK
Copyright (c) 2011-2016 ARM Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Author: Przemyslaw Wirkus <Przemyslaw.Wirkus@arm.com>
"""

import json
import os
import mbed_lstools
from time import sleep
from mbed_host_tests import DEFAULT_BAUD_RATE
import mbed_host_tests.host_tests_plugins as ht_plugins
from mbed_host_tests.host_tests_logger import HtrunLogger


class Mbed:
    """! Base class for a host driven test
    @details This class stores information about things like disk, port, serial speed etc.
             Class is also responsible for manipulation of serial port between host and mbed device
    """
    def __init__(self, options):
        """ ctor
        """
        # For compatibility with old mbed. We can use command line options for Mbed object
        # or we can pass options directly from .
        self.options = options
        self.logger = HtrunLogger('MBED')
        # Options related to copy / reset mbed device
        self.port = self.options.port
        self.disk = self.options.disk
        self.target_id = self.options.target_id
        self.image_path = self.options.image_path.strip('"') if self.options.image_path is not None else ''
        self.copy_method = self.options.copy_method
        self.retry_copy = self.options.retry_copy
        self.program_cycle_s = float(self.options.program_cycle_s if self.options.program_cycle_s is not None else 2.0)
        self.polling_timeout = self.options.polling_timeout

        # Serial port settings
        self.serial_baud = DEFAULT_BAUD_RATE
        self.serial_timeout = 1

        # Users can use command to pass port speeds together with port name. E.g. COM4:115200:1
        # Format if PORT:SPEED:TIMEOUT
        port_config = self.port.split(':') if self.port else ''
        if len(port_config) == 2:
            # -p COM4:115200
            self.port = port_config[0]
            self.serial_baud = int(port_config[1])
        elif len(port_config) == 3:
            # -p COM4:115200:0.5
            self.port = port_config[0]
            self.serial_baud = int(port_config[1])
            self.serial_timeout = float(port_config[2])

        # Overriding baud rate value with command line specified value
        self.serial_baud = self.options.baud_rate if self.options.baud_rate else self.serial_baud

        # Test configuration in JSON format
        self.test_cfg = None
        if self.options.json_test_configuration is not None:
            # We need to normalize path before we open file
            json_test_configuration_path = self.options.json_test_configuration.strip("\"'")
            try:
                self.logger.prn_inf("Loading test configuration from '%s'..." % json_test_configuration_path)
                with open(json_test_configuration_path) as data_file:
                    self.test_cfg = json.load(data_file)
            except IOError as e:
                self.logger.prn_err("Test configuration JSON file '{0}' I/O error({1}): {2}"
                                    .format(json_test_configuration_path, e.errno, e.strerror))
            except:
                self.logger.prn_err("Test configuration JSON Unexpected error:", str(e))
                raise

    def copy_image(self, image_path=None, disk=None, copy_method=None, port=None, retry_copy=5):
        """! Closure for copy_image_raw() method.
        @return Returns result from copy plugin
        """
        def get_remount_count(disk_path, tries=2):
            """! Get the remount count from 'DETAILS.TXT' file
            @return Returns count, None if not-available
            """
            for cur_try in range(1, tries + 1):
                try:
                    files_on_disk = [x.upper() for x in os.listdir(disk_path)]
                    if 'DETAILS.TXT' in files_on_disk:
                        with open(os.path.join(disk_path, 'DETAILS.TXT'), 'r') as details_txt:
                            for line in details_txt.readlines():
                                if 'Remount count:' in line:
                                    return int(line.replace('Remount count: ', ''))
                            # Remount count not found in file
                            return None
                    # 'DETAILS.TXT file not found
                    else:
                        return None

                except OSError as e:
                    self.logger.prn_err("Failed to get remount count due to OSError.", str(e))
                    self.logger.prn_inf("Retrying in 1 second (try %s of %s)" % (cur_try, tries))
                    sleep(1)
            # Failed to get remount count
            return None

        def check_flash_error(target_id, disk, initial_remount_count):
            """! Check for flash errors
            @return Returns false if FAIL.TXT present, else true
            """
            if not target_id:
                self.logger.prn_wrn("Target ID not found: Skipping flash check and retry")
                return True

            bad_files = set(['FAIL.TXT'])
            # Re-try at max 5 times with 0.5 sec in delay
            for i in range(5):
                # mbed_lstools.create() should be done inside the loop. Otherwise it will loop on same data.
                mbeds = mbed_lstools.create()
                mbed_list = mbeds.list_mbeds() #list of mbeds present
                # get first item in list with a matching target_id, if present
                mbed_target = next((x for x in mbed_list if x['target_id']==target_id), None)

                if mbed_target is not None:
                    if 'mount_point' in mbed_target and mbed_target['mount_point'] is not None:
                        if not initial_remount_count is None:
                            new_remount_count = get_remount_count(disk)
                            if not new_remount_count is None and new_remount_count == initial_remount_count:
                                sleep(0.5)
                                continue

                        common_items = []
                        try:
                            items = set([x.upper() for x in os.listdir(mbed_target['mount_point'])])
                            common_items = bad_files.intersection(items)
                        except OSError as e:
                            print("Failed to enumerate disk files, retrying")
                            continue

                        for common_item in common_items:
                            full_path = os.path.join(mbed_target['mount_point'], common_item)
                            self.logger.prn_err("Found %s"% (full_path))
                            bad_file_contents = "[failed to read bad file]"
                            try:
                                with open(full_path, "r") as bad_file:
                                    bad_file_contents = bad_file.read()
                            except IOError as error:
                                self.logger.prn_err("Error opening '%s': %s" % (full_path, error))

                            self.logger.prn_err("Error file contents:\n%s" % bad_file_contents)
                        if common_items:
                            return False
                sleep(0.5)
            return True

        # Set-up closure environment
        if not image_path:
            image_path = self.image_path
        if not disk:
            disk = self.disk
        if not copy_method:
            copy_method = self.copy_method
        if not port:
            port = self.port
        if not retry_copy:
            retry_copy = self.retry_copy
        target_id = self.target_id

        if not image_path:
            self.logger.prn_err("Error: image path not specified")
            return False

        if not os.path.isfile(image_path):
            self.logger.prn_err("Error: image file (%s) not found" % image_path)
            return False

        for count in range(0, retry_copy):
            initial_remount_count = get_remount_count(disk)
            # Call proper copy method
            result = self.copy_image_raw(image_path, disk, copy_method, port)
            sleep(self.program_cycle_s)
            if not result:
                continue
            result = check_flash_error(target_id, disk, initial_remount_count)
            if result:
                break
        return result

    def copy_image_raw(self, image_path=None, disk=None, copy_method=None, port=None):
        """! Copy file depending on method you want to use. Handles exception
             and return code from shell copy commands.
        @return Returns result from copy plugin
        @details Method which is actually copying image to mbed
        """
        # image_path - Where is binary with target's firmware

        # Select copy_method
        # We override 'default' method with 'shell' method
        copy_method = {
            None : 'shell',
            'default' : 'shell',
        }.get(copy_method, copy_method)

        result = ht_plugins.call_plugin('CopyMethod',
                                        copy_method,
                                        image_path=image_path,
                                        serial=port,
                                        destination_disk=disk,
                                        target_id=self.target_id,
                                        pooling_timeout=self.polling_timeout)
        return result

    def hw_reset(self):
        """
        Performs hardware reset of target ned device.

        :return:
        """
        device_info = {}
        result = ht_plugins.call_plugin('ResetMethod',
                                        'power_cycle',
                                        target_id=self.target_id,
                                        device_info=device_info)
        if result:
            self.port = device_info['serial_port']
            self.disk = device_info['mount_point']
        return result