File: py_dlt_receive.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (74 lines) | stat: -rw-r--r-- 2,361 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# Copyright (C) 2017. BMW Car IT GmbH. All rights reserved.
"""DLT Receive using py_dlt"""

import argparse
import logging
import time

from dlt.dlt import DLT_UDP_MULTICAST_FD_BUFFER_SIZE, DLT_UDP_MULTICAST_BUFFER_SIZE
from dlt.dlt_broker import DLTBroker

logging.basicConfig(format="%(asctime)s %(name)s %(levelname)-8s %(message)s")
root_logger = logging.getLogger()  # pylint: disable=invalid-name
logger = logging.getLogger("py-dlt-receive")  # pylint: disable=invalid-name


def parse_args():
    """Parse command line arguments"""
    logger.info("Parsing arguments")
    parser = argparse.ArgumentParser(description="Receive DLT messages")
    parser.add_argument("--host", required=True, help="hostname or ip address to connect to")
    parser.add_argument("--file", required=True, help="The file into which the messages will be written")
    parser.add_argument(
        "--udp-fd-buffer-size",
        dest="udp_fd_buffer_size",
        default=DLT_UDP_MULTICAST_FD_BUFFER_SIZE,
        type=int,
        help=f"Set the socket buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_FD_BUFFER_SIZE} bytes",
    )
    parser.add_argument(
        "--udp-buffer-size",
        dest="udp_buffer_size",
        default=DLT_UDP_MULTICAST_BUFFER_SIZE,
        type=int,
        help=f"Set the DltReceiver buffer size in udp multicast mode. default: {DLT_UDP_MULTICAST_BUFFER_SIZE} bytes",
    )
    return parser.parse_args()


def dlt_receive(options):
    """Receive DLT messages via DLTBroker"""
    logger.info("Creating DLTBroker instance")
    broker = DLTBroker(
        ip_address=options.host,
        filename=options.file,
        udp_fd_buffer_size_bytes=options.udp_buffer_size,
        udp_buffer_size_bytes=options.udp_fd_buffer_size,
    )

    logger.info("Starting DLTBroker")
    broker.start()  # start the loop
    try:
        logger.info("Receiving messages...")
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        logger.info("Interrupted...")
    finally:
        logger.info("Stopping DLT broker")
        broker.stop()
        logger.info("Stopped DLT broker")


def main():
    """Main function"""
    root_logger.setLevel(level=logging.INFO)

    options = parse_args()
    logger.info("Parsed arguments: %s", options)

    dlt_receive(options)


if __name__ == "__main__":
    main()