1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
|
"""
Replays CAN traffic saved with can.logger back
to a CAN bus.
Similar to canplayer in the can-utils package.
"""
import argparse
import errno
import sys
from datetime import datetime
from typing import TYPE_CHECKING, cast
from can import LogReader, MessageSync
from can.cli import (
_add_extra_args,
_parse_additional_config,
_set_logging_level_from_namespace,
add_bus_arguments,
create_bus_from_namespace,
)
if TYPE_CHECKING:
from collections.abc import Iterable
from can import Message
def main() -> None:
parser = argparse.ArgumentParser(description="Replay CAN traffic.")
player_group = parser.add_argument_group("Player arguments")
player_group.add_argument(
"-f",
"--file_name",
dest="log_file",
help="Path and base log filename, for supported types see can.LogReader.",
default=None,
)
player_group.add_argument(
"-v",
action="count",
dest="verbosity",
help="""Also print can frames to stdout.
You can add several of these to enable debugging""",
default=2,
)
player_group.add_argument(
"--ignore-timestamps",
dest="timestamps",
help="""Ignore timestamps (send all frames immediately with minimum gap between frames)""",
action="store_false",
)
player_group.add_argument(
"--error-frames",
help="Also send error frames to the interface.",
action="store_true",
)
player_group.add_argument(
"-g",
"--gap",
type=float,
help="<s> minimum time between replayed frames",
default=0.0001,
)
player_group.add_argument(
"-s",
"--skip",
type=float,
default=60 * 60 * 24,
help="<s> skip gaps greater than 's' seconds",
)
player_group.add_argument(
"infile",
metavar="input-file",
type=str,
help="The file to replay. For supported types see can.LogReader.",
)
# handle remaining arguments
_add_extra_args(player_group)
# add bus options
add_bus_arguments(parser)
# print help message when no arguments were given
if len(sys.argv) < 2:
parser.print_help(sys.stderr)
raise SystemExit(errno.EINVAL)
results, unknown_args = parser.parse_known_args()
additional_config = _parse_additional_config([*results.extra_args, *unknown_args])
_set_logging_level_from_namespace(results)
verbosity = results.verbosity
error_frames = results.error_frames
with create_bus_from_namespace(results) as bus:
with LogReader(results.infile, **additional_config) as reader:
in_sync = MessageSync(
cast("Iterable[Message]", reader),
timestamps=results.timestamps,
gap=results.gap,
skip=results.skip,
)
print(f"Can LogReader (Started on {datetime.now()})")
try:
for message in in_sync:
if message.is_error_frame and not error_frames:
continue
if verbosity >= 3:
print(message)
bus.send(message)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
|