File: core_21810.py

package info (click to toggle)
python-dlt 2.18.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 452 kB
  • sloc: python: 3,449; makefile: 55
file content (150 lines) | stat: -rw-r--r-- 5,955 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# Copyright (C) 2022. BMW CTW PT. All rights reserved.
"""v2.18.8 specific class definitions"""
import ctypes
import logging

from dlt.core.core_base import dltlib

# DltClientMode from dlt_client.h
DLT_CLIENT_MODE_UNDEFINED = -1
DLT_CLIENT_MODE_TCP = 0
DLT_CLIENT_MODE_SERIAL = 1
DLT_CLIENT_MODE_UNIX = 2
DLT_CLIENT_MODE_UDP_MULTICAST = 3

# DltReceiverType from dlt_common.h
DLT_RECEIVE_SOCKET = 0
DLT_RECEIVE_UDP_SOCKET = 1
DLT_RECEIVE_FD = 2
DLT_ID_SIZE = 4
DLT_FILTER_MAX = 30  # Maximum number of filters
DLT_RETURN_ERROR = -1

# Return value for DLTFilter.add() - exceeded maximum number of filters
MAX_FILTER_REACHED = 1
# Return value for DLTFilter.add() - specified filter already exists
REPEATED_FILTER = 2

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


class sockaddr_in(ctypes.Structure):  # pylint: disable=invalid-name
    """Auxiliary definition for cDltReceiver. Defined in netinet/in.h header"""

    _fields_ = [
        ("sa_family", ctypes.c_ushort),  # sin_family
        ("sin_port", ctypes.c_ushort),
        ("sin_addr", ctypes.c_byte * 4),
        ("__pad", ctypes.c_byte * 8),
    ]  # struct sockaddr_in is 16


class cDltReceiver(ctypes.Structure):  # pylint: disable=invalid-name
    """The structure is used to organise the receiving of data including buffer handling.
    This structure is used by the corresponding functions.

    typedef struct
    {
        int32_t lastBytesRcvd;    /**< bytes received in last receive call */
        int32_t bytesRcvd;        /**< received bytes */
        int32_t totalBytesRcvd;   /**< total number of received bytes */
        char *buffer;             /**< pointer to receiver buffer */
        char *buf;                /**< pointer to position within receiver buffer */
        char *backup_buf;         /** pointer to the buffer with partial messages if any **/
        int fd;                   /**< connection handle */
        DltReceiverType type;     /**< type of connection handle */
        int32_t buffersize;       /**< size of receiver buffer */
        struct sockaddr_in addr;  /**< socket address information */
    } DltReceiver;
    """

    _fields_ = [
        ("lastBytesRcvd", ctypes.c_int32),
        ("bytesRcvd", ctypes.c_int32),
        ("totalBytesRcvd", ctypes.c_int32),
        ("buffer", ctypes.POINTER(ctypes.c_char)),
        ("buf", ctypes.POINTER(ctypes.c_char)),
        ("backup_buf", ctypes.POINTER(ctypes.c_char)),
        ("fd", ctypes.c_int),
        ("type", ctypes.c_int),
        ("buffersize", ctypes.c_int32),
        ("addr", sockaddr_in),
    ]


class cDltClient(ctypes.Structure):  # pylint: disable=invalid-name
    """
    typedef struct
    {
        DltReceiver receiver;      /**< receiver pointer to dlt receiver structure */
        int sock;                  /**< sock Connection handle/socket */
        char *servIP;              /**< servIP IP adress/Hostname of TCP/IP interface */
        char *hostip;              /**< IP multicast address of group */
        int port;                  /**< Port for TCP connections (optional) */
        char *serialDevice;        /**< serialDevice Devicename of serial device */
        char *socketPath;          /**< socketPath Unix socket path */
        char ecuid[4];             /**< ECUiD */
        speed_t baudrate;          /**< baudrate Baudrate of serial interface, as speed_t */
        DltClientMode mode;        /**< mode DltClientMode */
        int send_serial_header;    /**< (Boolean) Send DLT messages with serial header */
        int resync_serial_header;  /**< (Boolean) Resync to serial header on all connection */
    } DltClient;
    """

    _fields_ = [
        ("receiver", cDltReceiver),
        ("sock", ctypes.c_int),
        ("servIP", ctypes.c_char_p),
        ("hostip", ctypes.c_char_p),
        ("port", ctypes.c_int),
        ("serialDevice", ctypes.c_char_p),
        ("socketPath", ctypes.c_char_p),
        ("ecuid", ctypes.c_char * 4),
        ("baudrate", ctypes.c_uint),
        ("mode", ctypes.c_int),
        ("send_serial_header", ctypes.c_int),
        ("resync_serial_header", ctypes.c_int),
    ]


class cDLTFilter(ctypes.Structure):  # pylint: disable=invalid-name
    """
    typedef struct
    {
        char apid[DLT_FILTER_MAX][DLT_ID_SIZE]; /**< application id */
        char ctid[DLT_FILTER_MAX][DLT_ID_SIZE]; /**< context id */
        int log_level[DLT_FILTER_MAX];          /**< log level */
        int32_t payload_max[DLT_FILTER_MAX];    /**< upper border for payload */
        int32_t payload_min[DLT_FILTER_MAX];    /**< lower border for payload */
        int  counter;                           /**< number of filters */
    } DltFilter;
    """

    _fields_ = [
        ("apid", (ctypes.c_char * DLT_ID_SIZE) * DLT_FILTER_MAX),
        ("ctid", (ctypes.c_char * DLT_ID_SIZE) * DLT_FILTER_MAX),
        ("log_level", ctypes.c_int * DLT_FILTER_MAX),
        ("payload_max", (ctypes.c_int32 * DLT_FILTER_MAX)),
        ("payload_min", (ctypes.c_int32 * DLT_FILTER_MAX)),
        ("counter", ctypes.c_int),
    ]

    # pylint: disable=too-many-arguments
    def add(self, apid, ctid, log_level=0, payload_min=0, payload_max=ctypes.c_uint32(-1).value // 2):
        """Add new filter pair"""
        if isinstance(apid, str):
            apid = bytes(apid, "ascii")
        if isinstance(ctid, str):
            ctid = bytes(ctid, "ascii")
        if (
            dltlib.dlt_filter_add(
                ctypes.byref(self), apid or b"", ctid or b"", log_level, payload_min, payload_max, self.verbose
            )
            == DLT_RETURN_ERROR
        ):
            if self.counter >= DLT_FILTER_MAX:
                logger.error("Maximum number (%d) of allowed filters reached, ignoring filter!\n", DLT_FILTER_MAX)
                return MAX_FILTER_REACHED
            logger.debug("Filter ('%s', '%s') already exists", apid, ctid)
            return REPEATED_FILTER
        return 0