1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
|
"""
This module contains the generic :class:`LogReader` as
well as :class:`MessageSync` which plays back messages
in the recorded order and time intervals.
"""
import gzip
import pathlib
import time
from typing import (
Any,
Dict,
Final,
Generator,
Iterable,
Tuple,
Type,
Union,
)
from .._entry_points import read_entry_points
from ..message import Message
from ..typechecking import AcceptedIOType, FileLike, StringPathLike
from .asc import ASCReader
from .blf import BLFReader
from .canutils import CanutilsLogReader
from .csv import CSVReader
from .generic import BinaryIOMessageReader, MessageReader
from .mf4 import MF4Reader
from .sqlite import SqliteReader
from .trc import TRCReader
#: A map of file suffixes to their corresponding
#: :class:`can.io.generic.MessageReader` class
MESSAGE_READERS: Final[Dict[str, Type[MessageReader]]] = {
".asc": ASCReader,
".blf": BLFReader,
".csv": CSVReader,
".db": SqliteReader,
".log": CanutilsLogReader,
".mf4": MF4Reader,
".trc": TRCReader,
}
def _update_reader_plugins() -> None:
"""Update available message reader plugins from entry points."""
for entry_point in read_entry_points("can.io.message_reader"):
if entry_point.key in MESSAGE_READERS:
continue
reader_class = entry_point.load()
if issubclass(reader_class, MessageReader):
MESSAGE_READERS[entry_point.key] = reader_class
def _get_logger_for_suffix(suffix: str) -> Type[MessageReader]:
"""Find MessageReader class for given suffix."""
try:
return MESSAGE_READERS[suffix]
except KeyError:
raise ValueError(f'No read support for unknown log format "{suffix}"') from None
def _decompress(
filename: StringPathLike,
) -> Tuple[Type[MessageReader], Union[str, FileLike]]:
"""
Return the suffix and io object of the decompressed file.
"""
suffixes = pathlib.Path(filename).suffixes
if len(suffixes) != 2:
raise ValueError(
f"No write support for unknown log format \"{''.join(suffixes)}\""
) from None
real_suffix = suffixes[-2].lower()
reader_type = _get_logger_for_suffix(real_suffix)
mode = "rb" if issubclass(reader_type, BinaryIOMessageReader) else "rt"
return reader_type, gzip.open(filename, mode)
def LogReader(filename: StringPathLike, **kwargs: Any) -> MessageReader: # noqa: N802
"""Find and return the appropriate :class:`~can.io.generic.MessageReader` instance
for a given file suffix.
The format is determined from the file suffix which can be one of:
* .asc :class:`can.ASCReader`
* .blf :class:`can.BLFReader`
* .csv :class:`can.CSVReader`
* .db :class:`can.SqliteReader`
* .log :class:`can.CanutilsLogReader`
* .mf4 :class:`can.MF4Reader`
(optional, depends on `asammdf <https://github.com/danielhrisca/asammdf>`_)
* .trc :class:`can.TRCReader`
Gzip compressed files can be used as long as the original
files suffix is one of the above (e.g. filename.asc.gz).
Exposes a simple iterator interface, to use simply::
for msg in can.LogReader("some/path/to/my_file.log"):
print(msg)
:param filename:
the filename/path of the file to read from
:raises ValueError:
if the filename's suffix is of an unknown file type
.. note::
There are no time delays, if you want to reproduce the measured
delays between messages look at the :class:`can.MessageSync` class.
.. note::
This function itself is just a dispatcher, and any positional and keyword
arguments are passed on to the returned instance.
"""
_update_reader_plugins()
suffix = pathlib.PurePath(filename).suffix.lower()
file_or_filename: AcceptedIOType = filename
if suffix == ".gz":
reader_type, file_or_filename = _decompress(filename)
else:
reader_type = _get_logger_for_suffix(suffix)
return reader_type(file=file_or_filename, **kwargs)
class MessageSync:
"""
Used to iterate over some given messages in the recorded time.
"""
def __init__(
self,
messages: Iterable[Message],
timestamps: bool = True,
gap: float = 0.0001,
skip: float = 60.0,
) -> None:
"""Creates an new **MessageSync** instance.
:param messages: An iterable of :class:`can.Message` instances.
:param timestamps: Use the messages' timestamps. If False, uses the *gap* parameter
as the time between messages.
:param gap: Minimum time between sent messages in seconds
:param skip: Skip periods of inactivity greater than this (in seconds).
Example::
import can
with can.LogReader("my_logfile.asc") as reader, can.Bus(interface="virtual") as bus:
for msg in can.MessageSync(messages=reader):
print(msg)
bus.send(msg)
"""
self.raw_messages = messages
self.timestamps = timestamps
self.gap = gap
self.skip = skip
def __iter__(self) -> Generator[Message, None, None]:
t_wakeup = playback_start_time = time.perf_counter()
recorded_start_time = None
t_skipped = 0.0
for message in self.raw_messages:
# Work out the correct wait time
if self.timestamps:
if recorded_start_time is None:
recorded_start_time = message.timestamp
t_wakeup = playback_start_time + (
message.timestamp - t_skipped - recorded_start_time
)
else:
t_wakeup += self.gap
sleep_period = t_wakeup - time.perf_counter()
if self.skip and sleep_period > self.skip:
t_skipped += sleep_period - self.skip
sleep_period = self.skip
if sleep_period > 1e-4:
time.sleep(sleep_period)
yield message
|