File: serial_device.py

package info (click to toggle)
python-alarmdecoder 1.13.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,036 kB
  • sloc: python: 3,719; javascript: 1,370; makefile: 147
file content (262 lines) | stat: -rw-r--r-- 7,327 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
"""
This module contains the :py:class:`SerialDevice` interface for the `AD2USB`_, `AD2SERIAL`_ or `AD2PI`_.

.. _AD2USB: http://www.alarmdecoder.com
.. _AD2SERIAL: http://www.alarmdecoder.com
.. _AD2PI: http://www.alarmdecoder.com

.. moduleauthor:: Scott Petersen <scott@nutech.com>
"""

import threading
import serial
import serial.tools.list_ports
import select
import sys
from .base_device import Device
from ..util import CommError, TimeoutError, NoDeviceError, bytes_hack, filter_ad2prot_byte


class SerialDevice(Device):
    """
    `AD2USB`_, `AD2SERIAL`_ or `AD2PI`_ device utilizing the PySerial interface.
    """

    # Constants
    BAUDRATE = 19200
    """Default baudrate for Serial devices."""

    @staticmethod
    def find_all(pattern=None):
        """
        Returns all serial ports present.

        :param pattern: pattern to search for when retrieving serial ports
        :type pattern: string

        :returns: list of devices
        :raises: :py:class:`~alarmdecoder.util.CommError`
        """
        devices = []

        try:
            if pattern:
                devices = serial.tools.list_ports.grep(pattern)
            else:
                devices = serial.tools.list_ports.comports()

        except serial.SerialException as err:
            raise CommError('Error enumerating serial devices: {0}'.format(str(err)), err)

        return devices

    @property
    def interface(self):
        """
        Retrieves the interface used to connect to the device.

        :returns: interface used to connect to the device
        """
        return self._port

    @interface.setter
    def interface(self, value):
        """
        Sets the interface used to connect to the device.

        :param value: name of the serial device
        :type value: string
        """
        self._port = value

    def __init__(self, interface=None):
        """
        Constructor

        :param interface: device to open
        :type interface: string
        """
        Device.__init__(self)

        self._port = interface
        self._id = interface
        # Timeout = non-blocking to match pyftdi.
        self._device = serial.Serial(timeout=0, writeTimeout=0)

    def open(self, baudrate=BAUDRATE, no_reader_thread=False):
        """
        Opens the device.

        :param baudrate: baudrate to use with the device
        :type baudrate: int
        :param no_reader_thread: whether or not to automatically start the
                                 reader thread.
        :type no_reader_thread: bool

        :raises: :py:class:`~alarmdecoder.util.NoDeviceError`
        """
        # Set up the defaults
        if baudrate is None:
            baudrate = SerialDevice.BAUDRATE

        if self._port is None:
            raise NoDeviceError('No device interface specified.')

        self._read_thread = Device.ReadThread(self)

        # Open the device and start up the reader thread.
        try:
            self._device.port = self._port
            self._device.open()
            # NOTE: Setting the baudrate before opening the
            #       port caused issues with Moschip 7840/7820
            #       USB Serial Driver converter. (mos7840)
            #
            #       Moving it to this point seems to resolve
            #       all issues with it.
            self._device.baudrate = baudrate

        except (serial.SerialException, ValueError, OSError) as err:
            raise NoDeviceError('Error opening device on {0}.'.format(self._port), err)

        else:
            self._running = True
            self.on_open()

            if not no_reader_thread:
                self._read_thread.start()

        return self

    def close(self):
        """
        Closes the device.
        """
        try:
            Device.close(self)

        except Exception:
            pass

    def fileno(self):
        """
        Returns the file number associated with the device

        :returns: int
        """
        return self._device.fileno()

    def write(self, data):
        """
        Writes data to the device.

        :param data: data to write
        :type data: string

        :raises: py:class:`~alarmdecoder.util.CommError`
        """
        try:
            # Hack to support unicode under Python 2.x
            if isinstance(data, str) or (sys.version_info < (3,) and isinstance(data, unicode)):
                data = data.encode('utf-8')

            self._device.write(data)

        except serial.SerialTimeoutException:
            pass

        except serial.SerialException as err:
            raise CommError('Error writing to device.', err)

        else:
            self.on_write(data=data)

    def read(self):
        """
        Reads a single character from the device.

        :returns: character read from the device
        :raises: :py:class:`~alarmdecoder.util.CommError`
        """
        data = b''

        try:
            read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5)

            if len(read_ready) != 0:
                data = filter_ad2prot_byte(self._device.read(1))

        except serial.SerialException as err:
            raise CommError('Error reading from device: {0}'.format(str(err)), err)

        return data.decode('utf-8')

    def read_line(self, timeout=0.0, purge_buffer=False):
        """
        Reads a line from the device.

        :param timeout: read timeout
        :type timeout: float
        :param purge_buffer: Indicates whether to purge the buffer prior to
                             reading.
        :type purge_buffer: bool

        :returns: line that was read
        :raises: :py:class:`~alarmdecoder.util.CommError`, :py:class:`~alarmdecoder.util.TimeoutError`
        """

        def timeout_event():
            """Handles read timeout event"""
            timeout_event.reading = False
        timeout_event.reading = True

        if purge_buffer:
            self._buffer = b''

        got_line, ret = False, None

        timer = threading.Timer(timeout, timeout_event)
        if timeout > 0:
            timer.start()

        try:
            while timeout_event.reading:
                read_ready, _, _ = select.select([self._device.fileno()], [], [], 0.5)

                if len(read_ready) == 0:
                    continue

                buf = filter_ad2prot_byte(self._device.read(1))

                if buf != b'':
                    self._buffer += buf

                    if buf == b"\n":
                        self._buffer = self._buffer.rstrip(b"\r\n")

                        if len(self._buffer) > 0:
                            got_line = True
                            break
        except (OSError, serial.SerialException) as err:
            raise CommError('Error reading from device: {0}'.format(str(err)), err)

        else:
            if got_line:
                ret, self._buffer = self._buffer, b''

                self.on_read(data=ret)

            else:
                raise TimeoutError('Timeout while waiting for line terminator.')

        finally:
            timer.cancel()

        return ret.decode('utf-8')

    def purge(self):
        """
        Purges read/write buffers.
        """
        self._device.flushInput()
        self._device.flushOutput()