File: bus.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (108 lines) | stat: -rw-r--r-- 3,041 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# -*- coding: utf-8 -*-
from __future__ import print_function

import abc
import logging
logger = logging.getLogger(__name__)


class BusABC(object):

    """CAN Bus Abstract Base Class

    Concrete implementations must implement the following methods:
        * send
        * recv

    As well as setting the `channel_info` attribute to a string describing the
    interface.

    """

    #: a string describing the underlying bus channel
    channel_info = 'unknown'

    @abc.abstractmethod
    def __init__(self, channel=None, can_filters=None, **config):
        """
        :param channel:
            The can interface identifier. Expected type is backend dependent.

        :param list can_filters:
            A list of dictionaries each containing a "can_id" and a "can_mask".

            >>> [{"can_id": 0x11, "can_mask": 0x21}]

            A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask``

        :param dict config:
            Any backend dependent configurations are passed in this dictionary
        """

    @abc.abstractmethod
    def recv(self, timeout=None):
        """Block waiting for a message from the Bus.

        :param float timeout: Seconds to wait for a message.

        :return:
            None on timeout or a :class:`can.Message` object.
        """
        raise NotImplementedError("Trying to read from a write only bus?")

    @abc.abstractmethod
    def send(self, msg):
        """Transmit a message to CAN bus.
        Override this method to enable the transmit path.

        :param msg: A :class:`can.Message` object.

        :raise: :class:`can.CanError`
            if the message could not be written.
        """
        raise NotImplementedError("Trying to write to a readonly bus?")

    def __iter__(self):
        """Allow iteration on messages as they are received.

            >>> for msg in bus:
            ...     print(msg)


        :yields: :class:`can.Message` msg objects.
        """
        while True:
            m = self.recv(timeout=1.0)
            if m is not None:
                yield m
        logger.debug("done iterating over bus messages")

    def set_filters(self, can_filters=None):
        """Apply filtering to all messages received by this Bus.

        Calling without passing any filters will reset the applied filters.

        :param list can_filters:
            A list of dictionaries each containing a "can_id" and a "can_mask".

            >>> [{"can_id": 0x11, "can_mask": 0x21}]

            A filter matches, when ``<received_can_id> & can_mask == can_id & can_mask``

        """
        raise NotImplementedError("Trying to set_filters on unsupported bus")

    def flush_tx_buffer(self):
        """Used for CAN interfaces which need to flush their transmit buffer.

        """
        pass

    def shutdown(self):
        """
        Called to carry out any interface specific cleanup required
        in shutting down a bus.
        """
        self.flush_tx_buffer()

    __metaclass__ = abc.ABCMeta