1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
|
# coding: utf-8
"""
Interface for slcan compatible interfaces (win32/linux).
.. note::
Linux users can use slcand or socketcan as well.
"""
from __future__ import absolute_import
import time
import logging
from can import BusABC, Message
logger = logging.getLogger(__name__)
try:
import serial
except ImportError:
logger.warning("You won't be able to use the slcan can backend without "
"the serial module installed!")
serial = None
class slcanBus(BusABC):
"""
slcan interface
"""
# the supported bitrates and their commands
_BITRATES = {
10000: 'S0',
20000: 'S1',
50000: 'S2',
100000: 'S3',
125000: 'S4',
250000: 'S5',
500000: 'S6',
750000: 'S7',
1000000: 'S8',
83300: 'S9'
}
_SLEEP_AFTER_SERIAL_OPEN = 2 # in seconds
LINE_TERMINATOR = b'\r'
def __init__(self, channel, ttyBaudrate=115200, bitrate=None,
btr=None, sleep_after_open=_SLEEP_AFTER_SERIAL_OPEN,
rtscts=False, **kwargs):
"""
:raise ValueError: if both *bitrate* and *btr* are set
:param str channel:
port of underlying serial or usb device (e.g. /dev/ttyUSB0, COM8, ...)
Must not be empty.
:param int ttyBaudrate:
baudrate of underlying serial or usb device
:param int bitrate:
Bitrate in bit/s
:param str btr:
BTR register value to set custom can speed
:param float poll_interval:
Poll interval in seconds when reading messages
:param float sleep_after_open:
Time to wait in seconds after opening serial connection
:param bool rtscts:
turn hardware handshake (RTS/CTS) on and off
"""
if not channel: # if None or empty
raise TypeError("Must specify a serial port.")
if '@' in channel:
(channel, ttyBaudrate) = channel.split('@')
self.serialPortOrig = serial.serial_for_url(
channel, baudrate=ttyBaudrate, rtscts=rtscts)
self._buffer = bytearray()
time.sleep(sleep_after_open)
if bitrate is not None and btr is not None:
raise ValueError("Bitrate and btr mutually exclusive.")
if bitrate is not None:
self.close()
if bitrate in self._BITRATES:
self.write(self._BITRATES[bitrate])
else:
raise ValueError("Invalid bitrate, choose one of " + (', '.join(self._BITRATES)) + '.')
if btr is not None:
self.close()
self.write("s" + btr)
self.open()
super(slcanBus, self).__init__(channel, ttyBaudrate=115200,
bitrate=None, rtscts=False, **kwargs)
def write(self, string):
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
self.serialPortOrig.flush()
def open(self):
self.write('O')
def close(self):
self.write('C')
def _recv_internal(self, timeout):
if timeout != self.serialPortOrig.timeout:
self.serialPortOrig.timeout = timeout
canId = None
remote = False
extended = False
frame = []
# First read what is already in the receive buffer
while (self.serialPortOrig.in_waiting and
self.LINE_TERMINATOR not in self._buffer):
self._buffer += self.serialPortOrig.read(1)
# If we still don't have a complete message, do a blocking read
if self.LINE_TERMINATOR not in self._buffer:
self._buffer += self.serialPortOrig.read_until(self.LINE_TERMINATOR)
if self.LINE_TERMINATOR not in self._buffer:
# Timed out
return None, False
readStr = self._buffer.decode()
del self._buffer[:]
if not readStr:
pass
elif readStr[0] == 'T':
# extended frame
canId = int(readStr[1:9], 16)
dlc = int(readStr[9])
extended = True
for i in range(0, dlc):
frame.append(int(readStr[10 + i * 2:12 + i * 2], 16))
elif readStr[0] == 't':
# normal frame
canId = int(readStr[1:4], 16)
dlc = int(readStr[4])
for i in range(0, dlc):
frame.append(int(readStr[5 + i * 2:7 + i * 2], 16))
elif readStr[0] == 'r':
# remote frame
canId = int(readStr[1:4], 16)
dlc = int(readStr[4])
remote = True
elif readStr[0] == 'R':
# remote extended frame
canId = int(readStr[1:9], 16)
dlc = int(readStr[9])
extended = True
remote = True
if canId is not None:
msg = Message(arbitration_id=canId,
is_extended_id=extended,
timestamp=time.time(), # Better than nothing...
is_remote_frame=remote,
dlc=dlc,
data=frame)
return msg, False
return None, False
def send(self, msg, timeout=None):
if timeout != self.serialPortOrig.write_timeout:
self.serialPortOrig.write_timeout = timeout
if msg.is_remote_frame:
if msg.is_extended_id:
sendStr = "R%08X%d" % (msg.arbitration_id, msg.dlc)
else:
sendStr = "r%03X%d" % (msg.arbitration_id, msg.dlc)
else:
if msg.is_extended_id:
sendStr = "T%08X%d" % (msg.arbitration_id, msg.dlc)
else:
sendStr = "t%03X%d" % (msg.arbitration_id, msg.dlc)
sendStr += "".join(["%02X" % b for b in msg.data])
self.write(sendStr)
def shutdown(self):
self.close()
self.serialPortOrig.close()
def fileno(self):
if hasattr(self.serialPortOrig, 'fileno'):
return self.serialPortOrig.fileno()
# Return an invalid file descriptor on Windows
return -1
|