File: usb2canInterface.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (139 lines) | stat: -rw-r--r-- 3,717 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# this interface is for windows only, otherwise use socketCAN

import logging

from can.bus import BusABC
from can.message import Message
from can.interfaces.usb2can import *
from can.interfaces.usb2canSerialFindWin import serial

bootTimeEpoch = 0
try:
    import uptime
    import datetime

    bootTimeEpoch = (uptime.boottime() - datetime.datetime.utcfromtimestamp(0)).total_seconds()
except:
    bootTimeEpoch = 0

# Set up logging
logging.basicConfig(level=logging.WARNING)
log = logging.getLogger('can.usb2can')


# setup the string for the device
def set_string(deviceID, baudrate='500'):
    # config = deviceID + '; ' + baudrate
    config = "%s; %s" % (deviceID, baudrate)

    return (config)


# TODO: Issue 36 with data being zeros or anything other than 8 must be fixed
def message_convert_tx(msg):
    messagetx = CanalMsg()

    length = len(msg.data)
    messagetx.sizeData = length

    messagetx.id = msg.arbitration_id

    for i in range(length):
        messagetx.data[i] = msg.data[i]

    messagetx.flags = 80000000

    if msg.is_error_frame:
        messagetx.flags |= IS_ERROR_FRAME

    if msg.is_remote_frame:
        messagetx.flags |= IS_REMOTE_FRAME

    if msg.id_type:
        messagetx.flags |= IS_ID_TYPE

    return messagetx


def message_convert_rx(messagerx):
    """convert the message from the CANAL type to pythoncan type"""
    ID_TYPE = bool(messagerx.flags & IS_ID_TYPE)
    REMOTE_FRAME = bool(messagerx.flags & IS_REMOTE_FRAME)
    ERROR_FRAME = bool(messagerx.flags & IS_ERROR_FRAME)

    msgrx = Message(timestamp=messagerx.timestamp,
                    is_remote_frame=REMOTE_FRAME,
                    extended_id=ID_TYPE,
                    is_error_frame=ERROR_FRAME,
                    arbitration_id=messagerx.id,
                    dlc=messagerx.sizeData,
                    data=messagerx.data[:messagerx.sizeData]
                    )

    return msgrx


# interface functions
class Usb2canBus(BusABC):
    def __init__(self, channel, *args, **kwargs):

        self.can = usb2can()

        enable_flags = c_ulong

        # set flags on the connection
        if 'flags' in kwargs:
            enable_flags = kwargs["flags"]

        else:
            enable_flags = 0x00000008

        # code to get the serial number of the device
        if 'serial' in kwargs:
            deviceID = kwargs["serial"]
        else:
            deviceID = serial()

        # set baudrate in kb/s from bitrate
        # (eg:500000 bitrate must be 500)
        if 'bitrate' in kwargs:
            br = kwargs["bitrate"]

            # max rate is 1000 kbps
            baudrate = max(1000, int(br/1000))
        # set default value
        else:
            baudrate = 500

        connector = set_string(deviceID, baudrate)

        self.handle = self.can.CanalOpen(connector, enable_flags)

    def send(self, msg):

        tx = message_convert_tx(msg)
        self.can.CanalSend(self.handle, byref(tx))

    def recv(self, timeout=None):

        messagerx = CanalMsg()

        if timeout is None:
            status = self.can.CanalReceive(self.handle, byref(messagerx))

        else:
            time = c_ulong
            time = timeout
            status = self.can.CanalBlockingReceive(self.handle, byref(messagerx), time)

        if status is 0:
            rx = message_convert_rx(messagerx)
        else:
            log.error('Canal Error %s', status)
            rx = None

        return rx

    def shutdown(self):
        """Shut down the device safely"""
        status = self.can.CanalClose(self.handle)