File: serial_can.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (69 lines) | stat: -rw-r--r-- 1,969 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
"""
Enable basic can over a serial device.

E.g. over bluetooth with "/dev/rfcomm0"

"""

import logging

logger = logging.getLogger('can.serial')

try:
    import serial
except ImportError:
    logger.error("You won't be able to use the serial can backend without the serial module installed!")
    serial = None

from can.bus import BusABC
from can.message import Message


class SerialBus(BusABC):

    def __init__(self, channel, *args, **kwargs):
        """A serial interface to CAN.

        :param str channel:
            The serial device to open.
        """
        if channel == '':
            raise TypeError("Must specify a serial port.")
        else:
            self.channel_info = "Serial interface: " + channel

            # Note: Some serial port implementations don't care about the baud rate
            self.ser = serial.Serial(channel, baudrate=115200, timeout=0.1)
        super(SerialBus, self).__init__(*args, **kwargs)

    def send(self, msg):
        raise NotImplementedError("This serial interface doesn't support transmit.")

    def recv(self, timeout=None):

        try:
            # ser.read can return an empty string ''
            # or raise a SerialException
            rx_byte = self.ser.read()
        except serial.SerialException:
            return None

        if len(rx_byte) and ord(rx_byte) == 0xAA:
            s = bytearray(self.ser.read(4))
            timestamp = s[0] + (s[1] << 8) + (s[2] << 16) + (s[3] << 24)
            dlc = ord(self.ser.read())

            s = bytearray(self.ser.read(4))
            arb_id = s[0] + (s[1] << 8) + (s[2] << 16) + (s[3] << 24)

            data = self.ser.read(dlc)

            rxd_byte = ord(self.ser.read())
            if rxd_byte == 0xBB:
                # received message data okay
                return Message(timestamp=timestamp, arbitration_id=arb_id, dlc=dlc, data=data)
            else:
                return None

        else:
            return None