File: csv.py

package info (click to toggle)
python-can 3.3.2.final~github-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 2,172 kB
  • sloc: python: 10,208; makefile: 30; sh: 12
file content (110 lines) | stat: -rw-r--r-- 3,905 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# coding: utf-8

"""
This module contains handling for CSV (comma separated values) files.

TODO: CAN FD messages are not yet supported.

TODO: This module could use https://docs.python.org/2/library/csv.html#module-csv
      to allow different delimiters for writing, special escape chars to circumvent
      the base64 encoding and use csv.Sniffer to automatically deduce the delimiters
      of a CSV file.
"""

from __future__ import absolute_import

from base64 import b64encode, b64decode

from can.message import Message
from can.listener import Listener
from .generic import BaseIOHandler


class CSVWriter(BaseIOHandler, Listener):
    """Writes a comma separated text file with a line for
    each message. Includes a header line.

    The columns are as follows:

    ================ ======================= ===============
    name of column   format description      example
    ================ ======================= ===============
    timestamp        decimal float           1483389946.197
    arbitration_id   hex                     0x00dadada
    extended         1 == True, 0 == False   1
    remote           1 == True, 0 == False   0
    error            1 == True, 0 == False   0
    dlc              int                     6
    data             base64 encoded          WzQyLCA5XQ==
    ================ ======================= ===============

    Each line is terminated with a platform specific line separator.
    """

    def __init__(self, file, append=False):
        """
        :param file: a path-like object or a file-like object to write to.
                     If this is a file-like object, is has to open in text
                     write mode, not binary write mode.
        :param bool append: if set to `True` messages are appended to
                            the file and no header line is written, else
                            the file is truncated and starts with a newly
                            written header line
        """
        mode = 'a' if append else 'w'
        super(CSVWriter, self).__init__(file, mode=mode)

        # Write a header row
        if not append:
            self.file.write("timestamp,arbitration_id,extended,remote,error,dlc,data\n")

    def on_message_received(self, msg):
        row = ','.join([
            repr(msg.timestamp), # cannot use str() here because that is rounding
            hex(msg.arbitration_id),
            '1' if msg.is_extended_id else '0',
            '1' if msg.is_remote_frame else '0',
            '1' if msg.is_error_frame else '0',
            str(msg.dlc),
            b64encode(msg.data).decode('utf8')
        ])
        self.file.write(row)
        self.file.write('\n')


class CSVReader(BaseIOHandler):
    """Iterator over CAN messages from a .csv file that was
    generated by :class:`~can.CSVWriter` or that uses the same
    format as described there. Assumes that there is a header
    and thus skips the first line.

    Any line separator is accepted.
    """

    def __init__(self, file):
        """
        :param file: a path-like object or as file-like object to read from
                     If this is a file-like object, is has to opened in text
                     read mode, not binary read mode.
        """
        super(CSVReader, self).__init__(file, mode='r')

    def __iter__(self):
        # skip the header line
        next(self.file)

        for line in self.file:

            timestamp, arbitration_id, extended, remote, error, dlc, data = line.split(',')

            yield Message(
                timestamp=float(timestamp),
                is_remote_frame=(remote == '1'),
                is_extended_id=(extended == '1'),
                is_error_frame=(error == '1'),
                arbitration_id=int(arbitration_id, base=16),
                dlc=int(dlc),
                data=b64decode(data),
            )

        self.stop()