File: player.py

package info (click to toggle)
python-can 4.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,372 kB
  • sloc: python: 25,840; makefile: 38; sh: 20
file content (111 lines) | stat: -rw-r--r-- 2,966 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Replays CAN traffic saved with can.logger back
to a CAN bus.

Similar to canplayer in the can-utils package.
"""

import argparse
import errno
import sys
from datetime import datetime
from typing import Iterable, cast

from can import LogReader, Message, MessageSync

from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config


def main() -> None:
    parser = argparse.ArgumentParser(description="Replay CAN traffic.")

    _create_base_argument_parser(parser)

    parser.add_argument(
        "-f",
        "--file_name",
        dest="log_file",
        help="Path and base log filename, for supported types see can.LogReader.",
        default=None,
    )

    parser.add_argument(
        "-v",
        action="count",
        dest="verbosity",
        help="""Also print can frames to stdout.
                        You can add several of these to enable debugging""",
        default=2,
    )

    parser.add_argument(
        "--ignore-timestamps",
        dest="timestamps",
        help="""Ignore timestamps (send all frames immediately with minimum gap between frames)""",
        action="store_false",
    )

    parser.add_argument(
        "--error-frames",
        help="Also send error frames to the interface.",
        action="store_true",
    )

    parser.add_argument(
        "-g",
        "--gap",
        type=float,
        help="<s> minimum time between replayed frames",
        default=0.0001,
    )
    parser.add_argument(
        "-s",
        "--skip",
        type=float,
        default=60 * 60 * 24,
        help="<s> skip gaps greater than 's' seconds",
    )

    parser.add_argument(
        "infile",
        metavar="input-file",
        type=str,
        help="The file to replay. For supported types see can.LogReader.",
    )

    # print help message when no arguments were given
    if len(sys.argv) < 2:
        parser.print_help(sys.stderr)
        raise SystemExit(errno.EINVAL)

    results, unknown_args = parser.parse_known_args()
    additional_config = _parse_additional_config([*results.extra_args, *unknown_args])

    verbosity = results.verbosity

    error_frames = results.error_frames

    with _create_bus(results, **additional_config) as bus:
        with LogReader(results.infile, **additional_config) as reader:
            in_sync = MessageSync(
                cast(Iterable[Message], reader),
                timestamps=results.timestamps,
                gap=results.gap,
                skip=results.skip,
            )

            print(f"Can LogReader (Started on {datetime.now()})")

            try:
                for message in in_sync:
                    if message.is_error_frame and not error_frames:
                        continue
                    if verbosity >= 3:
                        print(message)
                    bus.send(message)
            except KeyboardInterrupt:
                pass


if __name__ == "__main__":
    main()