File: pdu.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (135 lines) | stat: -rw-r--r-- 4,239 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import logging
from can import Message

from .arbitrationid import ArbitrationID
from .constants import pgn_strings, PGN_AC_ADDRESS_CLAIMED
from .nodename import NodeName

logger = logging.getLogger(__name__)


class PDU(object):

    """
    A PDU is a higher level abstraction of a CAN message.
    J1939 ensures that long messages are taken care of.
    """

    def __init__(self, timestamp=0.0, arbitration_id=None, data=None, info_strings=None):
        """
        :param float timestamp:
            Bus time in seconds.
        :param :class:`can.protocols.j1939.ArbitrationID` arbitration_id:

        :param bytes/bytearray/list data:
            With length up to 1785.
        """
        if data is None:
            data = []
        if info_strings is None:
            info_strings = []
        self.timestamp = timestamp
        self.arbitration_id = arbitration_id
        self.data = self._check_data(data)
        self.info_strings = info_strings

    def __eq__(self, other):
        """Returns True if the pgn, data, source and destination are the same"""
        if other is None:
            return False
        if self.pgn != other.pgn:
            return False
        if self.data != other.data:
            return False
        if self.source != other.source:
            return False
        if self.destination != other.destination:
            return False
        return True

    @property
    def pgn(self):
        if self.arbitration_id.pgn.is_destination_specific:
            return self.arbitration_id.pgn.value & 0xFF00
        else:
            return self.arbitration_id.pgn.value

    @property
    def destination(self):
        """Destination address of the message"""
        return self.arbitration_id.destination_address

    @property
    def source(self):
        """Source address of the message"""
        return self.arbitration_id.source_address

    @property
    def is_address_claim(self):
        return self.pgn == PGN_AC_ADDRESS_CLAIMED

    @property
    def arbitration_id(self):
        return self._arbitration_id

    @arbitration_id.setter
    def arbitration_id(self, other):
        if other is None:
            self._arbitration_id = ArbitrationID()
        elif not isinstance(other, ArbitrationID):
            self._arbitration_id = ArbitrationID(other)
        else:
            self._arbitration_id = other

    def _check_data(self, value):
        assert len(value) <= 1785, 'Too much data to fit in a j1939 CAN message. Got {0} bytes'.format(len(value))
        if len(value) > 0:
            assert min(value) >= 0, 'Data values must be between 0 and 255'
            assert max(value) <= 255, 'Data values must be between 0 and 255'
        return value

    def data_segments(self, segment_length=8):
        retval = []
        for i in range(0, len(self.data), segment_length):
            retval.append(self.data[i:i + min(segment_length, (len(self.data) - i))])
        return retval

    def check_equality(self, other, fields, debug=False):
        """
        :param :class:`~can.protocols.j1939.PDU` other:
        :param list[str] fields:
        """

        logger.debug("check_equality starting")

        retval = True
        for field in fields:
            try:
                own_value = getattr(self, field)
            except AttributeError:
                logger.warning("'%s' not found in 'self'" % field)
                return False

            try:
                other_value = getattr(other, field)
            except AttributeError:
                logger.debug("'%s' not found in 'other'" % field)
                return False

            if debug:
                self.info_strings.append("%s: %s, %s" % (field, own_value, other_value))
            if own_value != other_value:
                return False

        logger.debug("Messages match")
        return retval

    def __str__(self):
        """

        :return: A string representation of this message.

        """
        # TODO group this into 8 bytes per line and line them up...
        data_string = " ".join("{:02d}".format(byte) for byte in self.data)
        return "{s.timestamp:15.6f}    {s.arbitration_id}    {data}".format(s=self, data=data_string)