File: canlib.py

package info (click to toggle)
python-can 4.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,372 kB
  • sloc: python: 25,840; makefile: 38; sh: 20
file content (178 lines) | stat: -rw-r--r-- 5,844 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
from typing import Callable, List, Optional, Sequence, Union

import can.interfaces.ixxat.canlib_vcinpl as vcinpl
import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2
from can import (
    BusABC,
    BusState,
    CyclicSendTaskABC,
    Message,
)
from can.typechecking import AutoDetectedConfig


class IXXATBus(BusABC):
    """The CAN Bus implemented for the IXXAT interface.

    Based on the C implementation of IXXAT, two different dlls are provided by IXXAT, one to work with CAN,
    the other with CAN-FD.

    This class only delegates to related implementation (in calib_vcinpl or canlib_vcinpl2)
    class depending on fd user option.
    """

    def __init__(
        self,
        channel: int,
        can_filters=None,
        receive_own_messages: bool = False,
        unique_hardware_id: Optional[int] = None,
        extended: bool = True,
        fd: bool = False,
        rx_fifo_size: Optional[int] = None,
        tx_fifo_size: Optional[int] = None,
        bitrate: int = 500000,
        data_bitrate: int = 2000000,
        sjw_abr: Optional[int] = None,
        tseg1_abr: Optional[int] = None,
        tseg2_abr: Optional[int] = None,
        sjw_dbr: Optional[int] = None,
        tseg1_dbr: Optional[int] = None,
        tseg2_dbr: Optional[int] = None,
        ssp_dbr: Optional[int] = None,
        **kwargs,
    ):
        """
        :param channel:
            The Channel id to create this bus with.

        :param can_filters:
            See :meth:`can.BusABC.set_filters`.

        :param receive_own_messages:
            Enable self-reception of sent messages.

        :param unique_hardware_id:
            UniqueHardwareId to connect (optional, will use the first found if not supplied)

        :param extended:
            Default True, enables the capability to use extended IDs.

        :param fd:
            Default False, enables CAN-FD usage.

        :param rx_fifo_size:
            Receive fifo size (default 1024 for fd, else 16)

        :param tx_fifo_size:
            Transmit fifo size (default 128 for fd, else 16)

        :param bitrate:
            Channel bitrate in bit/s

        :param data_bitrate:
            Channel bitrate in bit/s (only in CAN-Fd if baudrate switch enabled).

        :param sjw_abr:
            Bus timing value sample jump width (arbitration). Only takes effect with fd enabled.

        :param tseg1_abr:
            Bus timing value tseg1 (arbitration). Only takes effect with fd enabled.

        :param tseg2_abr:
            Bus timing value tseg2 (arbitration). Only takes effect with fd enabled.

        :param sjw_dbr:
            Bus timing value sample jump width (data). Only takes effect with fd and baudrate switch enabled.

        :param tseg1_dbr:
            Bus timing value tseg1 (data). Only takes effect with fd and bitrate switch enabled.

        :param tseg2_dbr:
            Bus timing value tseg2 (data). Only takes effect with fd and bitrate switch enabled.

        :param ssp_dbr:
            Secondary sample point (data). Only takes effect with fd and bitrate switch enabled.

        """
        if fd:
            if rx_fifo_size is None:
                rx_fifo_size = 1024
            if tx_fifo_size is None:
                tx_fifo_size = 128
            self.bus = vcinpl2.IXXATBus(
                channel=channel,
                can_filters=can_filters,
                receive_own_messages=receive_own_messages,
                unique_hardware_id=unique_hardware_id,
                extended=extended,
                rx_fifo_size=rx_fifo_size,
                tx_fifo_size=tx_fifo_size,
                bitrate=bitrate,
                data_bitrate=data_bitrate,
                sjw_abr=sjw_abr,
                tseg1_abr=tseg1_abr,
                tseg2_abr=tseg2_abr,
                sjw_dbr=sjw_dbr,
                tseg1_dbr=tseg1_dbr,
                tseg2_dbr=tseg2_dbr,
                ssp_dbr=ssp_dbr,
                **kwargs,
            )
        else:
            if rx_fifo_size is None:
                rx_fifo_size = 16
            if tx_fifo_size is None:
                tx_fifo_size = 16
            self.bus = vcinpl.IXXATBus(
                channel=channel,
                can_filters=can_filters,
                receive_own_messages=receive_own_messages,
                unique_hardware_id=unique_hardware_id,
                extended=extended,
                rx_fifo_size=rx_fifo_size,
                tx_fifo_size=tx_fifo_size,
                bitrate=bitrate,
                **kwargs,
            )

        super().__init__(channel=channel, **kwargs)
        self._can_protocol = self.bus.protocol

    def flush_tx_buffer(self):
        """Flushes the transmit buffer on the IXXAT"""
        return self.bus.flush_tx_buffer()

    def _recv_internal(self, timeout):
        """Read a message from IXXAT device."""
        return self.bus._recv_internal(timeout)

    def send(self, msg: Message, timeout: Optional[float] = None) -> None:
        return self.bus.send(msg, timeout)

    def _send_periodic_internal(
        self,
        msgs: Union[Sequence[Message], Message],
        period: float,
        duration: Optional[float] = None,
        autostart: bool = True,
        modifier_callback: Optional[Callable[[Message], None]] = None,
    ) -> CyclicSendTaskABC:
        return self.bus._send_periodic_internal(
            msgs, period, duration, autostart, modifier_callback
        )

    def shutdown(self) -> None:
        super().shutdown()
        self.bus.shutdown()

    @property
    def state(self) -> BusState:
        """
        Return the current state of the hardware
        """
        return self.bus.state

    @staticmethod
    def _detect_available_configs() -> List[AutoDetectedConfig]:
        return vcinpl._detect_available_configs()