File: logger.py

package info (click to toggle)
python-can 4.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 3,428 kB
  • sloc: python: 27,154; makefile: 31; sh: 16
file content (141 lines) | stat: -rw-r--r-- 3,908 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import argparse
import errno
import sys
from datetime import datetime
from typing import (
    TYPE_CHECKING,
    Union,
)

from can import BusState, Logger, SizedRotatingLogger
from can.cli import (
    _add_extra_args,
    _parse_additional_config,
    _set_logging_level_from_namespace,
    add_bus_arguments,
    create_bus_from_namespace,
)
from can.typechecking import TAdditionalCliArgs

if TYPE_CHECKING:
    from can.io import BaseRotatingLogger
    from can.io.generic import MessageWriter


def _parse_logger_args(
    args: list[str],
) -> tuple[argparse.Namespace, TAdditionalCliArgs]:
    """Parse command line arguments for logger script."""

    parser = argparse.ArgumentParser(
        description="Log CAN traffic, printing messages to stdout or to a "
        "given file.",
    )

    logger_group = parser.add_argument_group("logger arguments")

    logger_group.add_argument(
        "-f",
        "--file_name",
        dest="log_file",
        help="Path and base log filename, for supported types see can.Logger.",
        default=None,
    )

    logger_group.add_argument(
        "-a",
        "--append",
        dest="append",
        help="Append to the log file if it already exists.",
        action="store_true",
    )

    logger_group.add_argument(
        "-s",
        "--file_size",
        dest="file_size",
        type=int,
        help="Maximum file size in bytes. Rotate log file when size threshold "
        "is reached. (The resulting file sizes will be consistent, but are not "
        "guaranteed to be exactly what is specified here due to the rollover "
        "conditions being logger implementation specific.)",
        default=None,
    )

    logger_group.add_argument(
        "-v",
        action="count",
        dest="verbosity",
        help="""How much information do you want to see at the command line?
                        You can add several of these e.g., -vv is DEBUG""",
        default=2,
    )

    state_group = logger_group.add_mutually_exclusive_group(required=False)
    state_group.add_argument(
        "--active",
        help="Start the bus as active, this is applied by default.",
        action="store_true",
    )
    state_group.add_argument(
        "--passive", help="Start the bus as passive.", action="store_true"
    )

    # handle remaining arguments
    _add_extra_args(logger_group)

    # add bus options
    add_bus_arguments(parser, filter_arg=True)

    # print help message when no arguments were given
    if not args:
        parser.print_help(sys.stderr)
        raise SystemExit(errno.EINVAL)

    results, unknown_args = parser.parse_known_args(args)
    additional_config = _parse_additional_config([*results.extra_args, *unknown_args])
    return results, additional_config


def main() -> None:
    results, additional_config = _parse_logger_args(sys.argv[1:])
    bus = create_bus_from_namespace(results)
    _set_logging_level_from_namespace(results)

    if results.active:
        bus.state = BusState.ACTIVE
    elif results.passive:
        bus.state = BusState.PASSIVE

    print(f"Connected to {bus.__class__.__name__}: {bus.channel_info}")
    print(f"Can Logger (Started on {datetime.now()})")

    logger: Union[MessageWriter, BaseRotatingLogger]
    if results.file_size:
        logger = SizedRotatingLogger(
            base_filename=results.log_file,
            max_bytes=results.file_size,
            append=results.append,
            **additional_config,
        )
    else:
        logger = Logger(
            filename=results.log_file,
            append=results.append,
            **additional_config,
        )

    try:
        while True:
            msg = bus.recv(1)
            if msg is not None:
                logger(msg)
    except KeyboardInterrupt:
        pass
    finally:
        bus.shutdown()
        logger.stop()


if __name__ == "__main__":
    main()