File: utils.py

package info (click to toggle)
python-can 4.6.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,428 kB
  • sloc: python: 27,154; makefile: 32; sh: 16
file content (77 lines) | stat: -rw-r--r-- 2,603 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
"""
Defines common functions.
"""

from typing import Any, Optional, cast

from can import CanInterfaceNotImplementedError, Message
from can.typechecking import ReadableBytesLike

try:
    import msgpack
except ImportError:
    msgpack = None


def is_msgpack_installed(raise_exception: bool = True) -> bool:
    """Check whether the ``msgpack`` module is installed.

    :param raise_exception:
        If True, raise a :class:`can.CanInterfaceNotImplementedError` when ``msgpack`` is not installed.
        If False, return False instead.
    :return:
        True if ``msgpack`` is installed, False otherwise.
    :raises can.CanInterfaceNotImplementedError:
        If ``msgpack`` is not installed and ``raise_exception`` is True.
    """
    if msgpack is None:
        if raise_exception:
            raise CanInterfaceNotImplementedError("msgpack not installed")
        return False
    return True


def pack_message(message: Message) -> bytes:
    """
    Pack a can.Message into a msgpack byte blob.

    :param message: the message to be packed
    """
    is_msgpack_installed()
    as_dict = {
        "timestamp": message.timestamp,
        "arbitration_id": message.arbitration_id,
        "is_extended_id": message.is_extended_id,
        "is_remote_frame": message.is_remote_frame,
        "is_error_frame": message.is_error_frame,
        "channel": message.channel,
        "dlc": message.dlc,
        "data": message.data,
        "is_fd": message.is_fd,
        "bitrate_switch": message.bitrate_switch,
        "error_state_indicator": message.error_state_indicator,
    }
    return cast("bytes", msgpack.packb(as_dict, use_bin_type=True))


def unpack_message(
    data: ReadableBytesLike,
    replace: Optional[dict[str, Any]] = None,
    check: bool = False,
) -> Message:
    """Unpack a can.Message from a msgpack byte blob.

    :param data: the raw data
    :param replace: a mapping from field names to values to be replaced after decoding the new message, or
                    `None` to disable this feature
    :param check: this is passed to :meth:`can.Message.__init__` to specify whether to validate the message

    :raise TypeError: if the data contains key that are not valid arguments for :meth:`can.Message.__init__`
    :raise ValueError: if `check` is true and the message metadata is invalid in some way
    :raise Exception: if there was another problem while unpacking
    """
    is_msgpack_installed()
    as_dict = msgpack.unpackb(data, raw=False)
    if replace is not None:
        as_dict.update(replace)
    return Message(check=check, **as_dict)