File: asc.py

package info (click to toggle)
python-can 3.0.0%2Bgithub-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,892 kB
  • sloc: python: 8,014; makefile: 29; sh: 12
file content (224 lines) | stat: -rw-r--r-- 7,899 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# coding: utf-8

"""
Contains handling of ASC logging files.

Example .asc files:
    - https://bitbucket.org/tobylorenz/vector_asc/src/47556e1a6d32c859224ca62d075e1efcc67fa690/src/Vector/ASC/tests/unittests/data/CAN_Log_Trigger_3_2.asc?at=master&fileviewer=file-view-default
    - under `test/data/logfile.asc`
"""

from __future__ import absolute_import

from datetime import datetime
import time
import logging

from ..message import Message
from ..listener import Listener
from ..util import channel2int
from .generic import BaseIOHandler

CAN_MSG_EXT = 0x80000000
CAN_ID_MASK = 0x1FFFFFFF

logger = logging.getLogger('can.io.asc')


class ASCReader(BaseIOHandler):
    """
    Iterator of CAN messages from a ASC logging file.

    TODO: turn relative timestamps back to absolute form
    """

    def __init__(self, file):
        """
        :param file: a path-like object or as file-like object to read from
                     If this is a file-like object, is has to opened in text
                     read mode, not binary read mode.
        """
        super(ASCReader, self).__init__(file, mode='r')

    @staticmethod
    def _extract_can_id(str_can_id):
        if str_can_id[-1:].lower() == 'x':
            is_extended = True
            can_id = int(str_can_id[0:-1], 16)
        else:
            is_extended = False
            can_id = int(str_can_id, 16)
        #logging.debug('ASCReader: _extract_can_id("%s") -> %x, %r', str_can_id, can_id, is_extended)
        return can_id, is_extended

    def __iter__(self):
        for line in self.file:
            #logger.debug("ASCReader: parsing line: '%s'", line.splitlines()[0])

            temp = line.strip()
            if not temp or not temp[0].isdigit():
                continue

            try:
                timestamp, channel, dummy = temp.split(None, 2) # , frameType, dlc, frameData
            except ValueError:
                # we parsed an empty comment
                continue

            timestamp = float(timestamp)
            try:
                # See ASCWriter
                channel = int(channel) - 1
            except ValueError:
                pass

            if dummy.strip()[0:10] == 'ErrorFrame':
                msg = Message(timestamp=timestamp, is_error_frame=True,
                              channel=channel)
                yield msg

            elif not isinstance(channel, int) or dummy.strip()[0:10] == 'Statistic:':
                pass

            elif dummy[-1:].lower() == 'r':
                can_id_str, _ = dummy.split(None, 1)
                can_id_num, is_extended_id = self._extract_can_id(can_id_str)
                msg = Message(timestamp=timestamp,
                              arbitration_id=can_id_num & CAN_ID_MASK,
                              extended_id=is_extended_id,
                              is_remote_frame=True,
                              channel=channel)
                yield msg

            else:
                try:
                    # this only works if dlc > 0 and thus data is availabe
                    can_id_str, _, _, dlc, data = dummy.split(None, 4)
                except ValueError:
                    # but if not, we only want to get the stuff up to the dlc
                    can_id_str, _, _, dlc       = dummy.split(None, 3)
                    # and we set data to an empty sequence manually
                    data = ''

                dlc = int(dlc)
                frame = bytearray()
                data = data.split()
                for byte in data[0:dlc]:
                    frame.append(int(byte, 16))

                can_id_num, is_extended_id = self._extract_can_id(can_id_str)

                yield Message(
                    timestamp=timestamp,
                    arbitration_id=can_id_num & CAN_ID_MASK,
                    extended_id=is_extended_id,
                    is_remote_frame=False,
                    dlc=dlc,
                    data=frame,
                    channel=channel
                )

        self.stop()


class ASCWriter(BaseIOHandler, Listener):
    """Logs CAN data to an ASCII log file (.asc).

    The measurement starts with the timestamp of the first registered message.
    If a message has a timestamp smaller than the previous one or None,
    it gets assigned the timestamp that was written for the last message.
    It the first message does not have a timestamp, it is set to zero.
    """

    FORMAT_MESSAGE = "{channel}  {id:<15} Rx   {dtype} {data}"
    FORMAT_DATE = "%a %b %m %I:%M:%S %p %Y"
    FORMAT_EVENT = "{timestamp: 9.4f} {message}\n"

    def __init__(self, file, channel=1):
        """
        :param file: a path-like object or as file-like object to write to
                     If this is a file-like object, is has to opened in text
                     write mode, not binary write mode.
        :param channel: a default channel to use when the message does not
                        have a channel set
        """
        super(ASCWriter, self).__init__(file, mode='w')
        self.channel = channel

        # write start of file header
        now = datetime.now().strftime("%a %b %m %I:%M:%S %p %Y")
        self.file.write("date %s\n" % now)
        self.file.write("base hex  timestamps absolute\n")
        self.file.write("internal events logged\n")

        # the last part is written with the timestamp of the first message
        self.header_written = False
        self.last_timestamp = None
        self.started = None

    def stop(self):
        if not self.file.closed:
            self.file.write("End TriggerBlock\n")
        super(ASCWriter, self).stop()

    def log_event(self, message, timestamp=None):
        """Add a message to the log file.

        :param str message: an arbitrary message
        :param float timestamp: the absolute timestamp of the event
        """

        if not message: # if empty or None
            logger.debug("ASCWriter: ignoring empty message")
            return

        # this is the case for the very first message:
        if not self.header_written:
            self.last_timestamp = (timestamp or 0.0)
            self.started = self.last_timestamp
            formatted_date = time.strftime(self.FORMAT_DATE, time.localtime(self.last_timestamp))
            self.file.write("Begin Triggerblock %s\n" % formatted_date)
            self.header_written = True
            self.log_event("Start of measurement") # caution: this is a recursive call!

        # figure out the correct timestamp
        if timestamp is None or timestamp < self.last_timestamp:
            timestamp = self.last_timestamp

        # turn into relative timestamps if necessary
        if timestamp >= self.started:
            timestamp -= self.started

        line = self.FORMAT_EVENT.format(timestamp=timestamp, message=message)
        self.file.write(line)

    def on_message_received(self, msg):

        if msg.is_error_frame:
            self.log_event("{}  ErrorFrame".format(self.channel), msg.timestamp)
            return

        if msg.is_remote_frame:
            dtype = 'r'
            data = []
        else:
            dtype = "d {}".format(msg.dlc)
            data = ["{:02X}".format(byte) for byte in msg.data]

        arb_id = "{:X}".format(msg.arbitration_id)
        if msg.is_extended_id:
            arb_id += 'x'

        channel = channel2int(msg.channel)
        if channel is None:
            channel = self.channel
        else:
            # Many interfaces start channel numbering at 0 which is invalid
            channel += 1

        serialized = self.FORMAT_MESSAGE.format(channel=channel,
                                                id=arb_id,
                                                dtype=dtype,
                                                data=' '.join(data))

        self.log_event(serialized, msg.timestamp)