File: conn_primitive_serial.py

package info (click to toggle)
python-mbed-host-tests 1.4.4-11
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 732 kB
  • sloc: python: 3,141; makefile: 27; sh: 14
file content (144 lines) | stat: -rw-r--r-- 5,996 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
"""
mbed SDK
Copyright (c) 2011-2016 ARM Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""


import time
from serial import Serial, SerialException
from mbed_host_tests import host_tests_plugins
from mbed_host_tests.host_tests_plugins.host_test_plugins import HostTestPluginBase
from .conn_primitive import ConnectorPrimitive, ConnectorPrimitiveException


class SerialConnectorPrimitive(ConnectorPrimitive):
    def __init__(self, name, port, baudrate, config):
        ConnectorPrimitive.__init__(self, name)
        self.port = port
        self.baudrate = int(baudrate)
        self.read_timeout = 0.01  # 10 milli sec
        self.write_timeout = 5
        self.config = config
        self.target_id = self.config.get('target_id', None)
        self.polling_timeout = config.get('polling_timeout', 60)
        self.forced_reset_timeout = config.get('forced_reset_timeout', 1)
        self.skip_reset = config.get('skip_reset', False)
        self.serial = None

        # Check if serial port for given target_id changed
        # If it does we will use new port to open connections and make sure reset plugin
        # later can reuse opened already serial port
        #
        # Note: This listener opens serial port and keeps connection so reset plugin uses
        # serial port object not serial port name!
        serial_port = HostTestPluginBase().check_serial_port_ready(self.port, target_id=self.target_id, timeout=self.polling_timeout)
        if serial_port is None:
            raise ConnectorPrimitiveException("Serial port not ready!")

        if serial_port != self.port:
            # Serial port changed for given targetID
            self.logger.prn_inf("serial port changed from '%s to '%s')"% (self.port, serial_port))
            self.port = serial_port

        startTime = time.time()
        self.logger.prn_inf("serial(port=%s, baudrate=%d, read_timeout=%s, write_timeout=%d)"% (self.port, self.baudrate, self.read_timeout, self.write_timeout))
        while time.time() - startTime < self.polling_timeout:
            try:
                # TIMEOUT: While creating Serial object timeout is delibrately passed as 0. Because blocking in Serial.read
                # impacts thread and mutliprocess functioning in Python. Hence, instead in self.read() s delay (sleep()) is
                # inserted to let serial buffer collect data and avoid spinning on non blocking read().
                self.serial = Serial(self.port, baudrate=self.baudrate, timeout=0, write_timeout=self.write_timeout)
            except SerialException as e:
                self.serial = None
                self.LAST_ERROR = "connection lost, serial.Serial(%s, %d, %d, %d): %s"% (self.port,
                    self.baudrate,
                    self.read_timeout,
                    self.write_timeout,
                    str(e))
                self.logger.prn_err(str(e))
                self.logger.prn_err("Retry after 1 sec until %s seconds" % self.polling_timeout)
            else:
                if not self.skip_reset:
                    self.reset_dev_via_serial(delay=self.forced_reset_timeout)
                break
            time.sleep(1)

    def reset_dev_via_serial(self, delay=1):
        """! Reset device using selected method, calls one of the reset plugins """
        reset_type = self.config.get('reset_type', 'default')
        if not reset_type:
            reset_type = 'default'
        disk = self.config.get('disk', None)

        self.logger.prn_inf("reset device using '%s' plugin..."% reset_type)
        result = host_tests_plugins.call_plugin('ResetMethod',
            reset_type,
            serial=self.serial,
            disk=disk,
            target_id=self.target_id,
            polling_timeout=self.config.get('polling_timeout'))
        # Post-reset sleep
        if delay:
            self.logger.prn_inf("waiting %.2f sec after reset"% delay)
            time.sleep(delay)
        self.logger.prn_inf("wait for it...")
        return result

    def read(self, count):
        """! Read data from serial port RX buffer """
        # TIMEOUT: Since read is called in a loop, wait for self.timeout period before calling serial.read(). See
        # comment on serial.Serial() call above about timeout.
        time.sleep(self.read_timeout)
        c = str()
        try:
            if self.serial:
                c = self.serial.read(count)
        except SerialException as e:
            self.serial = None
            self.LAST_ERROR = "connection lost, serial.read(%d): %s"% (count, str(e))
            self.logger.prn_err(str(e))
        return c

    def write(self, payload, log=False):
        """! Write data to serial port TX buffer """
        try:
            if self.serial:
                self.serial.write(payload.encode('utf-8'))
                if log:
                    self.logger.prn_txd(payload)
                return True
        except SerialException as e:
            self.serial = None
            self.LAST_ERROR = "connection lost, serial.write(%d bytes): %s"% (len(payload), str(e))
            self.logger.prn_err(str(e))
        return False

    def flush(self):
        if self.serial:
            self.serial.flush()

    def connected(self):
        return bool(self.serial)

    def finish(self):
        if self.serial:
            self.serial.close()

    def reset(self):
        self.reset_dev_via_serial(self.forced_reset_timeout)

    def __del__(self):
        self.finish()