File: iscan.py

package info (click to toggle)
python-can 4.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,372 kB
  • sloc: python: 25,840; makefile: 38; sh: 20
file content (217 lines) | stat: -rw-r--r-- 6,348 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
"""
Interface for isCAN from *Thorsis Technologies GmbH*, former *ifak system GmbH*.
"""

import ctypes
import logging
import time
from typing import Optional, Tuple, Union

from can import (
    BusABC,
    CanError,
    CanInitializationError,
    CanInterfaceNotImplementedError,
    CanOperationError,
    CanProtocol,
    Message,
)

logger = logging.getLogger(__name__)

CanData = ctypes.c_ubyte * 8


class MessageExStruct(ctypes.Structure):
    _fields_ = [
        ("message_id", ctypes.c_ulong),
        ("is_extended", ctypes.c_ubyte),
        ("remote_req", ctypes.c_ubyte),
        ("data_len", ctypes.c_ubyte),
        ("data", CanData),
    ]


def check_status_initialization(result: int, function, arguments) -> int:
    if result > 0:
        raise IscanInitializationError(function, result, arguments)
    return result


def check_status(result: int, function, arguments) -> int:
    if result > 0:
        raise IscanOperationError(function, result, arguments)
    return result


try:
    iscan = ctypes.cdll.LoadLibrary("iscandrv")
except OSError as e:
    iscan = None
    logger.warning("Failed to load IS-CAN driver: %s", e)
else:
    iscan.isCAN_DeviceInitEx.argtypes = [ctypes.c_ubyte, ctypes.c_ubyte]
    iscan.isCAN_DeviceInitEx.errcheck = check_status_initialization
    iscan.isCAN_DeviceInitEx.restype = ctypes.c_ubyte

    iscan.isCAN_ReceiveMessageEx.errcheck = check_status
    iscan.isCAN_ReceiveMessageEx.restype = ctypes.c_ubyte

    iscan.isCAN_TransmitMessageEx.errcheck = check_status
    iscan.isCAN_TransmitMessageEx.restype = ctypes.c_ubyte

    iscan.isCAN_CloseDevice.errcheck = check_status
    iscan.isCAN_CloseDevice.restype = ctypes.c_ubyte


class IscanBus(BusABC):
    """isCAN interface"""

    BAUDRATES = {
        5000: 0,
        10000: 1,
        20000: 2,
        50000: 3,
        100000: 4,
        125000: 5,
        250000: 6,
        500000: 7,
        800000: 8,
        1000000: 9,
    }

    def __init__(
        self,
        channel: Union[str, int],
        bitrate: int = 500000,
        poll_interval: float = 0.01,
        **kwargs,
    ) -> None:
        """
        :param channel:
            Device number
        :param bitrate:
            Bitrate in bits/s
        :param poll_interval:
            Poll interval in seconds when reading messages
        """
        if iscan is None:
            raise CanInterfaceNotImplementedError("Could not load isCAN driver")

        self.channel = ctypes.c_ubyte(int(channel))
        self.channel_info = f"IS-CAN: {self.channel}"
        self._can_protocol = CanProtocol.CAN_20

        if bitrate not in self.BAUDRATES:
            raise ValueError(f"Invalid bitrate, choose one of {set(self.BAUDRATES)}")

        self.poll_interval = poll_interval
        iscan.isCAN_DeviceInitEx(self.channel, self.BAUDRATES[bitrate])

        super().__init__(
            channel=channel,
            bitrate=bitrate,
            poll_interval=poll_interval,
            **kwargs,
        )

    def _recv_internal(
        self, timeout: Optional[float]
    ) -> Tuple[Optional[Message], bool]:
        raw_msg = MessageExStruct()
        end_time = time.time() + timeout if timeout is not None else None
        while True:
            try:
                iscan.isCAN_ReceiveMessageEx(self.channel, ctypes.byref(raw_msg))
            except IscanError as e:
                if e.error_code != 8:  # "No message received"
                    # An error occurred
                    raise
                if end_time is not None and time.time() > end_time:
                    # No message within timeout
                    return None, False
                # Sleep a short time to avoid hammering
                time.sleep(self.poll_interval)
            else:
                # A message was received
                break

        msg = Message(
            arbitration_id=raw_msg.message_id,
            is_extended_id=bool(raw_msg.is_extended),
            timestamp=time.time(),  # Better than nothing...
            is_remote_frame=bool(raw_msg.remote_req),
            dlc=raw_msg.data_len,
            data=raw_msg.data[: raw_msg.data_len],
            channel=self.channel.value,
        )
        return msg, False

    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
        raw_msg = MessageExStruct(
            msg.arbitration_id,
            bool(msg.is_extended_id),
            bool(msg.is_remote_frame),
            msg.dlc,
            CanData(*msg.data),
        )
        iscan.isCAN_TransmitMessageEx(self.channel, ctypes.byref(raw_msg))

    def shutdown(self) -> None:
        super().shutdown()
        iscan.isCAN_CloseDevice(self.channel)


class IscanError(CanError):
    ERROR_CODES = {
        0: "Success",
        1: "No access to device",
        2: "Device with ID not found",
        3: "Driver operation failed",
        4: "Invalid parameter",
        5: "Operation allowed only in online state",
        6: "Device timeout",
        7: "Device is transmitting a message",
        8: "No message received",
        9: "Thread not started",
        10: "Thread already started",
        11: "Buffer overrun",
        12: "Device not initialized",
        15: "Found the device, but it is being used by another process",
        16: "Bus error",
        17: "Bus off",
        18: "Error passive",
        19: "Data overrun",
        20: "Error warning",
        30: "Send error",
        31: "Transmission not acknowledged on bus",
        32: "Error critical bus",
        35: "Callbackthread is blocked, stopping thread failed",
        40: "Need a licence number under NT4",
    }

    def __init__(self, function, error_code: int, arguments) -> None:
        try:
            description = ": " + self.ERROR_CODES[error_code]
        except KeyError:
            description = ""

        super().__init__(
            f"Function {function.__name__} failed{description}",
            error_code=error_code,
        )

        #: Status code
        self.error_code = error_code
        #: Function that failed
        self.function = function
        #: Arguments passed to function
        self.arguments = arguments


class IscanOperationError(IscanError, CanOperationError):
    pass


class IscanInitializationError(IscanError, CanInitializationError):
    pass