1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
import logging
from typing import Optional, Tuple
import usb
from gs_usb.constants import CAN_EFF_FLAG, CAN_ERR_FLAG, CAN_MAX_DLC, CAN_RTR_FLAG
from gs_usb.gs_usb import GsUsb
from gs_usb.gs_usb_frame import GS_USB_NONE_ECHO_ID, GsUsbFrame
import can
from ..exceptions import CanInitializationError, CanOperationError
logger = logging.getLogger(__name__)
class GsUsbBus(can.BusABC):
def __init__(
self,
channel,
bitrate: int = 500_000,
index=None,
bus=None,
address=None,
can_filters=None,
**kwargs,
):
"""
:param channel: usb device name
:param index: device number if using automatic scan, starting from 0.
If specified, bus/address shall not be provided.
:param bus: number of the bus that the device is connected to
:param address: address of the device on the bus it is connected to
:param can_filters: not supported
:param bitrate: CAN network bandwidth (bits/s)
"""
self._is_shutdown = False
if (index is not None) and ((bus or address) is not None):
raise CanInitializationError(
"index and bus/address cannot be used simultaneously"
)
if index is None and address is None and bus is None:
index = channel
self._index = None
if index is not None:
devs = GsUsb.scan()
if len(devs) <= index:
raise CanInitializationError(
f"Cannot find device {index}. Devices found: {len(devs)}"
)
gs_usb = devs[index]
self._index = index
else:
gs_usb = GsUsb.find(bus=bus, address=address)
if not gs_usb:
raise CanInitializationError(f"Cannot find device {channel}")
self.gs_usb = gs_usb
self.channel_info = channel
self._can_protocol = can.CanProtocol.CAN_20
bit_timing = can.BitTiming.from_sample_point(
f_clock=self.gs_usb.device_capability.fclk_can,
bitrate=bitrate,
sample_point=87.5,
)
props_seg = 1
self.gs_usb.set_timing(
prop_seg=props_seg,
phase_seg1=bit_timing.tseg1 - props_seg,
phase_seg2=bit_timing.tseg2,
sjw=bit_timing.sjw,
brp=bit_timing.brp,
)
self.gs_usb.start()
self._bitrate = bitrate
super().__init__(
channel=channel,
can_filters=can_filters,
**kwargs,
)
def send(self, msg: can.Message, timeout: Optional[float] = None):
"""Transmit a message to the CAN bus.
:param Message msg: A message object.
:param timeout: timeout is not supported.
The function won't return until message is sent or exception is raised.
:raises CanOperationError:
if the message could not be sent
"""
can_id = msg.arbitration_id
if msg.is_extended_id:
can_id = can_id | CAN_EFF_FLAG
if msg.is_remote_frame:
can_id = can_id | CAN_RTR_FLAG
if msg.is_error_frame:
can_id = can_id | CAN_ERR_FLAG
# Pad message data
msg.data.extend([0x00] * (CAN_MAX_DLC - len(msg.data)))
frame = GsUsbFrame()
frame.can_id = can_id
frame.can_dlc = msg.dlc
frame.timestamp_us = 0 # timestamp frame field is only useful on receive
frame.data = list(msg.data)
try:
self.gs_usb.send(frame)
except usb.core.USBError as exc:
raise CanOperationError("The message could not be sent") from exc
def _recv_internal(
self, timeout: Optional[float]
) -> Tuple[Optional[can.Message], bool]:
"""
Read a message from the bus and tell whether it was filtered.
This methods may be called by :meth:`~can.BusABC.recv`
to read a message multiple times if the filters set by
:meth:`~can.BusABC.set_filters` do not match and the call has
not yet timed out.
Never raises an error/exception.
:param float timeout: seconds to wait for a message,
see :meth:`~can.BusABC.send`
0 and None will be converted to minimum value 1ms.
:return:
1. a message that was read or None on timeout
2. a bool that is True if message filtering has already
been done and else False. In this interface it is always False
since filtering is not available
"""
frame = GsUsbFrame()
# Do not set timeout as None or zero here to avoid blocking
timeout_ms = round(timeout * 1000) if timeout else 1
if not self.gs_usb.read(frame=frame, timeout_ms=timeout_ms):
return None, False
msg = can.Message(
timestamp=frame.timestamp,
arbitration_id=frame.arbitration_id,
is_extended_id=frame.is_extended_id,
is_remote_frame=frame.is_remote_frame,
is_error_frame=frame.is_error_frame,
channel=self.channel_info,
dlc=frame.can_dlc,
data=bytearray(frame.data)[0 : frame.can_dlc],
is_rx=frame.echo_id == GS_USB_NONE_ECHO_ID,
)
return msg, False
def shutdown(self):
if self._is_shutdown:
return
super().shutdown()
self.gs_usb.stop()
if self._index is not None:
# Avoid errors on subsequent __init() by repeating the .scan() and .start() that would otherwise fail
# the next time the device is opened in __init__()
devs = GsUsb.scan()
if self._index < len(devs):
gs_usb = devs[self._index]
try:
gs_usb.set_bitrate(self._bitrate)
gs_usb.start()
gs_usb.stop()
except usb.core.USBError:
pass
self._is_shutdown = True
|