1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
|
"""
This module contains handling for CSV (comma separated values) files.
TODO: CAN FD messages are not yet supported.
TODO: This module could use https://docs.python.org/2/library/csv.html#module-csv
to allow different delimiters for writing, special escape chars to circumvent
the base64 encoding and use csv.Sniffer to automatically deduce the delimiters
of a CSV file.
"""
from base64 import b64decode, b64encode
from collections.abc import Generator
from typing import Any, TextIO, Union
from can.message import Message
from ..typechecking import StringPathLike
from .generic import TextIOMessageReader, TextIOMessageWriter
class CSVReader(TextIOMessageReader):
"""Iterator over CAN messages from a .csv file that was
generated by :class:`~can.CSVWriter` or that uses the same
format as described there. Assumes that there is a header
and thus skips the first line.
Any line separator is accepted.
"""
def __init__(
self,
file: Union[StringPathLike, TextIO],
**kwargs: Any,
) -> None:
"""
:param file: a path-like object or as file-like object to read from
If this is a file-like object, is has to opened in text
read mode, not binary read mode.
"""
super().__init__(file, mode="r")
def __iter__(self) -> Generator[Message, None, None]:
# skip the header line
try:
next(self.file)
except StopIteration:
# don't crash on a file with only a header
return
for line in self.file:
timestamp, arbitration_id, extended, remote, error, dlc, data = line.split(
","
)
yield Message(
timestamp=float(timestamp),
is_remote_frame=(remote == "1"),
is_extended_id=(extended == "1"),
is_error_frame=(error == "1"),
arbitration_id=int(arbitration_id, base=16),
dlc=int(dlc),
data=b64decode(data),
)
self.stop()
class CSVWriter(TextIOMessageWriter):
"""Writes a comma separated text file with a line for
each message. Includes a header line.
The columns are as follows:
================ ======================= ===============
name of column format description example
================ ======================= ===============
timestamp decimal float 1483389946.197
arbitration_id hex 0x00dadada
extended 1 == True, 0 == False 1
remote 1 == True, 0 == False 0
error 1 == True, 0 == False 0
dlc int 6
data base64 encoded WzQyLCA5XQ==
================ ======================= ===============
Each line is terminated with a platform specific line separator.
"""
def __init__(
self,
file: Union[StringPathLike, TextIO],
append: bool = False,
**kwargs: Any,
) -> None:
"""
:param file: a path-like object or a file-like object to write to.
If this is a file-like object, is has to open in text
write mode, not binary write mode.
:param bool append: if set to `True` messages are appended to
the file and no header line is written, else
the file is truncated and starts with a newly
written header line
"""
super().__init__(file, mode="a" if append else "w")
# Write a header row
if not append:
self.file.write("timestamp,arbitration_id,extended,remote,error,dlc,data\n")
def on_message_received(self, msg: Message) -> None:
row = ",".join(
[
repr(msg.timestamp), # cannot use str() here because that is rounding
hex(msg.arbitration_id),
"1" if msg.is_extended_id else "0",
"1" if msg.is_remote_frame else "0",
"1" if msg.is_error_frame else "0",
str(msg.dlc),
b64encode(msg.data).decode("utf8"),
]
)
self.file.write(row)
self.file.write("\n")
|