File: usb2canInterface.py

package info (click to toggle)
python-can 3.0.0%2Bgithub-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,892 kB
  • sloc: python: 8,014; makefile: 29; sh: 12
file content (154 lines) | stat: -rw-r--r-- 4,301 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# coding: utf-8

"""
This interface is for windows only, otherwise use socketCAN.
"""

from __future__ import absolute_import, division

import logging

from can import BusABC, Message
from .usb2canabstractionlayer import *

bootTimeEpoch = 0
try:
    import uptime
    import datetime

    bootTimeEpoch = (uptime.boottime() - datetime.datetime.utcfromtimestamp(0)).total_seconds()
except:
    bootTimeEpoch = 0

# Set up logging
log = logging.getLogger('can.usb2can')


def format_connection_string(deviceID, baudrate='500'):
    """setup the string for the device

    config = deviceID + '; ' + baudrate
    """
    return "%s; %s" % (deviceID, baudrate)


def message_convert_tx(msg):
    messagetx = CanalMsg()

    length = len(msg.data)
    messagetx.sizeData = length

    messagetx.id = msg.arbitration_id

    for i in range(length):
        messagetx.data[i] = msg.data[i]

    messagetx.flags = 0x80000000

    if msg.is_error_frame:
        messagetx.flags |= IS_ERROR_FRAME

    if msg.is_remote_frame:
        messagetx.flags |= IS_REMOTE_FRAME

    if msg.is_extended_id:
        messagetx.flags |= IS_ID_TYPE

    return messagetx


def message_convert_rx(messagerx):
    """convert the message from the CANAL type to pythoncan type"""
    ID_TYPE = bool(messagerx.flags & IS_ID_TYPE)
    REMOTE_FRAME = bool(messagerx.flags & IS_REMOTE_FRAME)
    ERROR_FRAME = bool(messagerx.flags & IS_ERROR_FRAME)

    msgrx = Message(timestamp=messagerx.timestamp,
                    is_remote_frame=REMOTE_FRAME,
                    extended_id=ID_TYPE,
                    is_error_frame=ERROR_FRAME,
                    arbitration_id=messagerx.id,
                    dlc=messagerx.sizeData,
                    data=messagerx.data[:messagerx.sizeData]
                    )

    return msgrx


class Usb2canBus(BusABC):
    """Interface to a USB2CAN Bus.

    :param str channel:
        The device's serial number. If not provided, Windows Management Instrumentation
        will be used to identify the first such device. The *kwarg* `serial` may also be
        used.

    :param int bitrate:
        Bitrate of channel in bit/s. Values will be limited to a maximum of 1000 Kb/s.
        Default is 500 Kbs

    :param int flags:
        Flags to directly pass to open function of the usb2can abstraction layer.
    """

    def __init__(self, channel, *args, **kwargs):

        self.can = Usb2CanAbstractionLayer()

        # set flags on the connection
        if 'flags' in kwargs:
            enable_flags = kwargs["flags"]
        else:
            enable_flags = 0x00000008

        # code to get the serial number of the device
        if 'serial' in kwargs:
            deviceID = kwargs["serial"]
        elif channel is not None:
            deviceID = channel
        else:
            from can.interfaces.usb2can.serial_selector import serial
            deviceID = serial()

        # get baudrate in b/s from bitrate or use default
        bitrate = kwargs.get("bitrate", 500000)
        # convert to kb/s (eg:500000 bitrate must be 500), max rate is 1000 kb/s
        baudrate = min(1000, int(bitrate/1000))

        connector = format_connection_string(deviceID, baudrate)

        self.handle = self.can.open(connector.encode('utf-8'), enable_flags)

    def send(self, msg, timeout=None):
        tx = message_convert_tx(msg)
        if timeout:
            self.can.blocking_send(self.handle, byref(tx), int(timeout * 1000))
        else:
            self.can.send(self.handle, byref(tx))

    def _recv_internal(self, timeout):

        messagerx = CanalMsg()

        if timeout == 0:
            status = self.can.receive(self.handle, byref(messagerx))

        else:
            time = 0 if timeout is None else int(timeout * 1000)
            status = self.can.blocking_receive(self.handle, byref(messagerx), time)

        if status == 0:
            rx = message_convert_rx(messagerx)
        elif status == 19 or status == 32:
            # CANAL_ERROR_RCV_EMPTY or CANAL_ERROR_TIMEOUT
            rx = None
        else:
            log.error('Canal Error %s', status)
            rx = None

        return rx, False

    def shutdown(self):
        """Shut down the device safely"""
        # TODO handle error
        status = self.can.close(self.handle)