File: j1939_logger.py

package info (click to toggle)
python-can 1.5.2-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 644 kB
  • ctags: 1,184
  • sloc: python: 4,373; makefile: 14
file content (129 lines) | stat: -rw-r--r-- 4,004 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
from __future__ import print_function

import argparse
import datetime
import textwrap
import json

import can


def parse_arguments():
    parser = argparse.ArgumentParser(
        description=textwrap.dedent("""\
        Log J1939 traffic, printing messages to stdout or to a given file.

        Values for SOURCE and PGN can be provided as either hex or decimals.
        e.g. 0xEE00 or 60928

        The interface or channel can also be loaded from
        a configuration file - see the README for detail.
        """),
        epilog="""Pull requests and issues
        https://bitbucket.org/hardbyte/python-can""",
        formatter_class=argparse.RawTextHelpFormatter
    )

    parser.add_argument("-v", action="count", dest="verbosity",
                        help=textwrap.dedent('''\
    command line verbosity
    How much information do you want to see at the command line?
    You can add several of these e.g., -vv is DEBUG'''), default=2)

    filter_group = parser.add_mutually_exclusive_group()
    filter_group.add_argument('--pgn',
                              help=textwrap.dedent('''\
    Filter messages with given Parameter Group Number (PGN).
    Can be passed multiple times. Only messages that match will
    be logged.'''), action="append")

    filter_group.add_argument('--source', help=textwrap.dedent('''\
    Only listen for messages from the given Source address
    Can be used more than once.'''), action="append")

    filter_group.add_argument('--filter',
                              type=argparse.FileType('r'),
                              help=textwrap.dedent('''\
    Provide a json file with filtering rules.

    An example file that subscribes to all messages from SRC=0
    and two particular PGNs from SRC=1:

    [
      {
        "source": 1,
        "pgn": 61475
      }
      {
        "source": 1,
        "pgn": 61474
      }
      {
        "source": 0
      }
    ]

    '''))

    parser.add_argument('-c', '--channel',
                        help=textwrap.dedent('''\
    Most backend interfaces require some sort of channel.
    For example with the serial interface the channel might be a rfcomm device: "/dev/rfcomm0"
    With the socketcan interfaces valid channel examples include: "can0", "vcan0".

    Alternatively the CAN_CHANNEL environment variable can be set.
    '''))

    parser.add_argument('-i', '--interface', dest="interface",
                        #choices=can.interface.VALID_INTERFACES,
                        help=textwrap.dedent('''\
    Specify the backend CAN interface to use.

    Valid choices:
        {}

    Alternatively the CAN_INTERFACE environment variable can be set.
    '''.format(can.interface.VALID_INTERFACES)))

    return parser.parse_args()


if __name__ == "__main__":

    args = parse_arguments()

    verbosity = args.verbosity
    logging_level_name = ['critical', 'error', 'warning', 'info', 'debug', 'subdebug'][min(5, verbosity)]
    can.set_logging_level(logging_level_name)

    from can.interfaces.interface import *
    from can.protocols import j1939

    filters = []
    if args.pgn is not None:
        print('Have to filter pgns: ', args.pgn)
        for pgn in args.pgn:
            if pgn.startswith('0x'):
                pgn = int(pgn[2:], base=16)

            filters.append({'pgn': int(pgn)})
    if args.source is not None:
        for src in args.source:
            if src.startswith("0x"):
                src = int(src[2:], base=16)
            filters.append({"source": int(src)})
    if args.filter is not None:
        filters = json.load(args.filter)
        #print("Loaded filters from file: ", filters)

    bus = j1939.Bus(channel=args.channel, bustype=args.interface, j1939_filters=filters)
    log_start_time = datetime.datetime.now()
    print('can.j1939 logger started on {}\n'.format(log_start_time))

    try:
        for msg in bus:
            print(msg)
    except KeyboardInterrupt:
        bus.shutdown()
        print()