File: usb2canabstractionlayer.py

package info (click to toggle)
python-can 3.0.0%2Bgithub-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,892 kB
  • sloc: python: 8,014; makefile: 29; sh: 12
file content (156 lines) | stat: -rw-r--r-- 4,145 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# coding: utf-8

"""
This wrapper is for windows or direct access via CANAL API.
Socket CAN is recommended under Unix/Linux systems.
"""

import can
from ctypes import *
from struct import *
import logging

log = logging.getLogger('can.usb2can')

# type definitions
flags = c_ulong
pConfigureStr = c_char_p
handle = c_long
timeout = c_ulong
filter = c_ulong

# flags mappings
IS_ERROR_FRAME = 4
IS_REMOTE_FRAME = 2
IS_ID_TYPE = 1


class CanalStatistics(Structure):
    _fields_ = [('ReceiveFrams', c_ulong),
                ('TransmistFrams', c_ulong),
                ('ReceiveData', c_ulong),
                ('TransmitData', c_ulong),
                ('Overruns', c_ulong),
                ('BusWarnings', c_ulong),
                ('BusOff', c_ulong)]


stat = CanalStatistics


class CanalStatus(Structure):
    _fields_ = [('channel_status', c_ulong),
                ('lasterrorcode', c_ulong),
                ('lasterrorsubcode', c_ulong),
                ('lasterrorstr', c_byte * 80)]


# data type for the CAN Message
class CanalMsg(Structure):
    _fields_ = [('flags', c_ulong),
                ('obid', c_ulong),
                ('id', c_ulong),
                ('sizeData', c_ubyte),
                ('data', c_ubyte * 8),
                ('timestamp', c_ulong)]


class Usb2CanAbstractionLayer:
    """A low level wrapper around the usb2can library.

    Documentation: http://www.8devices.com/media/products/usb2can/downloads/CANAL_API.pdf

    """
    def __init__(self):
        self.__m_dllBasic = windll.LoadLibrary("usb2can.dll")

        if self.__m_dllBasic is None:
            log.warning('DLL failed to load')

    def open(self, pConfigureStr, flags):
        try:
            res = self.__m_dllBasic.CanalOpen(pConfigureStr, flags)
            return res
        except:
            log.warning('Failed to open')
            raise

    def close(self, handle):
        try:
            res = self.__m_dllBasic.CanalClose(handle)
            return res
        except:
            log.warning('Failed to close')
            raise

    def send(self, handle, msg):
        try:
            res = self.__m_dllBasic.CanalSend(handle, msg)
            return res
        except:
            log.warning('Sending error')
            raise can.CanError("Failed to transmit frame")

    def receive(self, handle, msg):
        try:
            res = self.__m_dllBasic.CanalReceive(handle, msg)
            return res
        except:
            log.warning('Receive error')
            raise

    def blocking_send(self, handle, msg, timeout):
        try:
            res = self.__m_dllBasic.CanalBlockingSend(handle, msg, timeout)
            return res
        except:
            log.warning('Blocking send error')
            raise

    def blocking_receive(self, handle, msg, timeout):
        try:
            res = self.__m_dllBasic.CanalBlockingReceive(handle, msg, timeout)
            return res
        except:
            log.warning('Blocking Receive Failed')
            raise

    def get_status(self, handle, CanalStatus):
        try:
            res = self.__m_dllBasic.CanalGetStatus(handle, CanalStatus)
            return res
        except:
            log.warning('Get status failed')
            raise

    def get_statistics(self, handle, CanalStatistics):
        try:
            res = self.__m_dllBasic.CanalGetStatistics(handle, CanalStatistics)
            return res
        except:
            log.warning('Get Statistics failed')
            raise

    def get_version(self):
        try:
            res = self.__m_dllBasic.CanalGetVersion()
            return res
        except:
            log.warning('Failed to get version info')
            raise

    def get_library_version(self):
        try:
            res = self.__m_dllBasic.CanalGetDllVersion()
            return res
        except:
            log.warning('Failed to get DLL version')
            raise

    def get_vendor_string(self):
        try:
            res = self.__m_dllBasic.CanalGetVendorString()
            return res
        except:
            log.warning('Failed to get vendor string')
            raise