File: utils.py

package info (click to toggle)
python-can 4.5.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,372 kB
  • sloc: python: 25,840; makefile: 38; sh: 20
file content (87 lines) | stat: -rw-r--r-- 2,833 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""
Defines common socketcan functions.
"""

import errno
import json
import logging
import os
import struct
import subprocess
import sys
from typing import List, Optional, cast

from can import typechecking
from can.interfaces.socketcan.constants import CAN_EFF_FLAG

log = logging.getLogger(__name__)


def pack_filters(can_filters: Optional[typechecking.CanFilters] = None) -> bytes:
    if can_filters is None:
        # Pass all messages
        can_filters = [{"can_id": 0, "can_mask": 0}]

    can_filter_fmt = f"={2 * len(can_filters)}I"
    filter_data = []
    for can_filter in can_filters:
        can_id = can_filter["can_id"]
        can_mask = can_filter["can_mask"]
        if "extended" in can_filter:
            can_filter = cast(typechecking.CanFilterExtended, can_filter)
            # Match on either 11-bit OR 29-bit messages instead of both
            can_mask |= CAN_EFF_FLAG
            if can_filter["extended"]:
                can_id |= CAN_EFF_FLAG
        filter_data.append(can_id)
        filter_data.append(can_mask)

    return struct.pack(can_filter_fmt, *filter_data)


def find_available_interfaces() -> List[str]:
    """Returns the names of all open can/vcan interfaces

    The function calls the ``ip link list`` command. If the lookup fails, an error
    is logged to the console and an empty list is returned.

    :return: The list of available and active CAN interfaces or an empty list of the command failed
    """
    if sys.platform != "linux":
        return []

    try:
        command = ["ip", "-json", "link", "list", "up"]
        output_str = subprocess.check_output(command, text=True)
    except Exception:  # pylint: disable=broad-except
        # subprocess.CalledProcessError is too specific
        log.exception("failed to fetch opened can devices from ip link")
        return []

    try:
        output_json = json.loads(output_str)
    except json.JSONDecodeError:
        log.exception("Failed to parse ip link JSON output: %s", output_str)
        return []

    log.debug(
        "find_available_interfaces(): detected these interfaces (before filtering): %s",
        output_json,
    )

    interfaces = [i["ifname"] for i in output_json if i.get("link_type") == "can"]
    return interfaces


def error_code_to_str(code: Optional[int]) -> str:
    """
    Converts a given error code (errno) to a useful and human readable string.

    :param code: a possibly invalid/unknown error code
    :returns: a string explaining and containing the given error code, or a string
              explaining that the errorcode is unknown if that is the case
    """
    name = errno.errorcode.get(code, "UNKNOWN")  # type: ignore
    description = os.strerror(code) if code is not None else "NO DESCRIPTION AVAILABLE"

    return f"{name} (errno {code}): {description}"