File: player.py

package info (click to toggle)
python-can 4.6.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,428 kB
  • sloc: python: 27,154; makefile: 32; sh: 16
file content (128 lines) | stat: -rw-r--r-- 3,359 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""
Replays CAN traffic saved with can.logger back
to a CAN bus.

Similar to canplayer in the can-utils package.
"""

import argparse
import errno
import sys
from datetime import datetime
from typing import TYPE_CHECKING, cast

from can import LogReader, MessageSync
from can.cli import (
    _add_extra_args,
    _parse_additional_config,
    _set_logging_level_from_namespace,
    add_bus_arguments,
    create_bus_from_namespace,
)

if TYPE_CHECKING:
    from collections.abc import Iterable

    from can import Message


def main() -> None:
    parser = argparse.ArgumentParser(description="Replay CAN traffic.")

    player_group = parser.add_argument_group("Player arguments")

    player_group.add_argument(
        "-f",
        "--file_name",
        dest="log_file",
        help="Path and base log filename, for supported types see can.LogReader.",
        default=None,
    )

    player_group.add_argument(
        "-v",
        action="count",
        dest="verbosity",
        help="""Also print can frames to stdout.
                        You can add several of these to enable debugging""",
        default=2,
    )

    player_group.add_argument(
        "--ignore-timestamps",
        dest="timestamps",
        help="""Ignore timestamps (send all frames immediately with minimum gap between frames)""",
        action="store_false",
    )

    player_group.add_argument(
        "--error-frames",
        help="Also send error frames to the interface.",
        action="store_true",
    )

    player_group.add_argument(
        "-g",
        "--gap",
        type=float,
        help="<s> minimum time between replayed frames",
        default=0.0001,
    )
    player_group.add_argument(
        "-s",
        "--skip",
        type=float,
        default=60 * 60 * 24,
        help="<s> skip gaps greater than 's' seconds",
    )

    player_group.add_argument(
        "infile",
        metavar="input-file",
        type=str,
        help="The file to replay. For supported types see can.LogReader.",
    )

    # handle remaining arguments
    _add_extra_args(player_group)

    # add bus options
    add_bus_arguments(parser)

    # print help message when no arguments were given
    if len(sys.argv) < 2:
        parser.print_help(sys.stderr)
        raise SystemExit(errno.EINVAL)

    results, unknown_args = parser.parse_known_args()
    additional_config = _parse_additional_config([*results.extra_args, *unknown_args])

    _set_logging_level_from_namespace(results)
    verbosity = results.verbosity

    error_frames = results.error_frames

    with create_bus_from_namespace(results) as bus:
        with LogReader(results.infile, **additional_config) as reader:
            in_sync = MessageSync(
                cast("Iterable[Message]", reader),
                timestamps=results.timestamps,
                gap=results.gap,
                skip=results.skip,
            )

            print(f"Can LogReader (Started on {datetime.now()})")

            try:
                for message in in_sync:
                    if message.is_error_frame and not error_frames:
                        continue
                    if verbosity >= 3:
                        print(message)
                    bus.send(message)
            except KeyboardInterrupt:
                pass


if __name__ == "__main__":
    main()