File: interface.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (126 lines) | stat: -rw-r--r-- 5,057 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import can
from can.broadcastmanager import CyclicSendTaskABC, MultiRateCyclicSendTaskABC
from can.util import load_config, choose_socketcan_implementation

VALID_INTERFACES = set(['kvaser', 'serial', 'pcan', 'socketcan_native',
                        'socketcan_ctypes', 'socketcan', 'usb2can', 'ixxat',
                        'virtual'])


class Bus(object):
    """
    Instantiates a CAN Bus of the given `bustype`, falls back to reading a
    configuration file from default locations.
    """

    @classmethod
    def __new__(cls, other, channel=None, *args, **kwargs):
        """
        Takes the same arguments as :class:`can.BusABC` with the addition of:

        :param kwargs:
            Should contain a bustype key with a valid interface name.

        :raises: NotImplementedError if the bustype isn't recognized
        """
        if 'bustype' in kwargs:
            can.rc['interface'] = kwargs['bustype']
            del kwargs['bustype']

            if can.rc['interface'] == 'socketcan':
                can.rc['interface'] = choose_socketcan_implementation()

        if 'interface' not in can.rc or 'channel' not in can.rc or can.rc['interface'] is None:
            can.log.debug("Loading default configuration")
            # Load defaults
            can.rc = load_config()

        if can.rc['interface'] not in VALID_INTERFACES:
            raise NotImplementedError('Invalid CAN Bus Type - {}'.format(can.rc['interface']))

        # Import the correct Bus backend
        interface = can.rc['interface']
        if interface == 'kvaser':
            from can.interfaces.kvaser import KvaserBus
            cls = KvaserBus
        elif interface == 'socketcan_ctypes':
            from can.interfaces.socketcan_ctypes import SocketcanCtypes_Bus
            cls = SocketcanCtypes_Bus
        elif interface == 'socketcan_native':
            from can.interfaces.socketcan_native import SocketcanNative_Bus
            cls = SocketcanNative_Bus
        elif interface == 'serial':
            from can.interfaces.serial.serial_can import SerialBus
            cls = SerialBus
        elif interface == 'pcan':
            from can.interfaces.pcan import PcanBus
            cls = PcanBus
        elif interface == 'usb2can':
            from can.interfaces.usb2canInterface import Usb2canBus
            cls = Usb2canBus
        elif interface == 'ixxat':
            from can.interfaces.ixxat import IXXATBus
            cls = IXXATBus
        elif interface == 'virtual':
            from can.interfaces.virtual import VirtualBus
            cls = VirtualBus
        else:
            raise NotImplementedError("CAN interface '{}' not supported".format(interface))

        if channel is None:
            channel = can.rc['channel']
        return cls(channel, **kwargs)


class CyclicSendTask(CyclicSendTaskABC):

    @classmethod
    def __new__(cls, other, channel, *args, **kwargs):

        # If can.rc doesn't look valid: load default
        if 'interface' not in can.rc or 'channel' not in can.rc:
            can.log.debug("Loading default configuration")
            can.rc = load_config()

        print(can.rc)
        if can.rc['interface'] not in VALID_INTERFACES:
            raise NotImplementedError('Invalid CAN Bus Type - {}'.format(can.rc['interface']))

        # Import the correct implementation of CyclicSendTask
        if can.rc['interface'] == 'socketcan_ctypes':
            from can.interfaces.socketcan_ctypes import CyclicSendTask as _ctypesCyclicSendTask
            cls = _ctypesCyclicSendTask
        elif can.rc['interface'] == 'socketcan_native':
            from can.interfaces.socketcan_native import CyclicSendTask as _nativeCyclicSendTask
            cls = _nativeCyclicSendTask
        else:
            can.log.info("Current CAN interface doesn't support CyclicSendTask")

        return cls(channel, *args, **kwargs)


class MultiRateCyclicSendTask(MultiRateCyclicSendTaskABC):

    @classmethod
    def __new__(cls, other, channel, *args, **kwargs):

        # If can.rc doesn't look valid: load default
        if 'interface' not in can.rc or 'channel' not in can.rc:
            can.log.debug("Loading default configuration")
            can.rc = load_config()

        print(can.rc)
        if can.rc['interface'] not in VALID_INTERFACES:
            raise NotImplementedError('Invalid CAN Bus Type - {}'.format(can.rc['interface']))

        # Import the correct implementation of CyclicSendTask
        if can.rc['interface'] == 'socketcan_ctypes':
            from can.interfaces.socketcan_ctypes import MultiRateCyclicSendTask as _ctypesMultiRateCyclicSendTask
            cls = _ctypesMultiRateCyclicSendTask
        elif can.rc['interface'] == 'socketcan_native':
            from can.interfaces.socketcan_native import MultiRateCyclicSendTask as _nativeMultiRateCyclicSendTask
            cls = _nativeMultiRateCyclicSendTask
        else:
            can.log.info("Current CAN interface doesn't support CyclicSendTask")

        return cls(channel, *args, **kwargs)