File: mqtt_packet.py

package info (click to toggle)
python-roborock 2.39.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,128 kB
  • sloc: python: 10,342; makefile: 17
file content (133 lines) | stat: -rw-r--r-- 3,721 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Module for crafting MQTT packets.

This library is copied from the paho mqtt client library tests, with just the
parts needed for some roborock messages. This message format in this file is
not specific to roborock.
"""

import struct

PROP_RECEIVE_MAXIMUM = 33
PROP_TOPIC_ALIAS_MAXIMUM = 34


def gen_uint16_prop(identifier: int, word: int) -> bytes:
    """Generate a property with a uint16_t value."""
    prop = struct.pack("!BH", identifier, word)
    return prop


def pack_varint(varint: int) -> bytes:
    """Pack a variable integer."""
    s = b""
    while True:
        byte = varint % 128
        varint = varint // 128
        # If there are more digits to encode, set the top bit of this digit
        if varint > 0:
            byte = byte | 0x80

        s = s + struct.pack("!B", byte)
        if varint == 0:
            return s


def prop_finalise(props: bytes) -> bytes:
    """Finalise the properties."""
    if props is None:
        return pack_varint(0)
    else:
        return pack_varint(len(props)) + props


def gen_connack(flags=0, rc=0, properties=b"", property_helper=True):
    """Generate a CONNACK packet."""
    if property_helper:
        if properties is not None:
            properties = (
                gen_uint16_prop(PROP_TOPIC_ALIAS_MAXIMUM, 10) + properties + gen_uint16_prop(PROP_RECEIVE_MAXIMUM, 20)
            )
        else:
            properties = b""
    properties = prop_finalise(properties)

    packet = struct.pack("!BBBB", 32, 2 + len(properties), flags, rc) + properties

    return packet


def gen_suback(mid: int, qos: int = 0) -> bytes:
    """Generate a SUBACK packet."""
    return struct.pack("!BBHBB", 144, 2 + 1 + 1, mid, 0, qos)


def _gen_short(cmd: int, reason_code: int) -> bytes:
    return struct.pack("!BBB", cmd, 1, reason_code)


def gen_disconnect(reason_code: int = 0) -> bytes:
    """Generate a DISCONNECT packet."""
    return _gen_short(0xE0, reason_code)


def _gen_command_with_mid(cmd: int, mid: int, reason_code: int = 0) -> bytes:
    return struct.pack("!BBHB", cmd, 3, mid, reason_code)


def gen_puback(mid: int, reason_code: int = 0) -> bytes:
    """Generate a PUBACK packet."""
    return _gen_command_with_mid(64, mid, reason_code)


def _pack_remaining_length(remaining_length: int) -> bytes:
    """Pack a remaining length."""
    s = b""
    while True:
        byte = remaining_length % 128
        remaining_length = remaining_length // 128
        # If there are more digits to encode, set the top bit of this digit
        if remaining_length > 0:
            byte = byte | 0x80

        s = s + struct.pack("!B", byte)
        if remaining_length == 0:
            return s


def gen_publish(
    topic: str,
    payload: bytes | None = None,
    retain: bool = False,
    dup: bool = False,
    mid: int = 0,
    properties: bytes = b"",
) -> bytes:
    """Generate a PUBLISH packet."""
    if isinstance(topic, str):
        topic_b = topic.encode("utf-8")
    rl = 2 + len(topic_b)
    pack_format = "H" + str(len(topic_b)) + "s"

    properties = prop_finalise(properties)
    rl += len(properties)
    # This will break if len(properties) > 127
    pack_format = f"{pack_format}{len(properties)}s"

    if payload is not None:
        # payload = payload.encode("utf-8")
        rl = rl + len(payload)
        pack_format = pack_format + str(len(payload)) + "s"
    else:
        payload = b""
        pack_format = pack_format + "0s"

    rlpacked = _pack_remaining_length(rl)
    cmd = 48
    if retain:
        cmd = cmd + 1
    if dup:
        cmd = cmd + 8

    return struct.pack(
        "!B" + str(len(rlpacked)) + "s" + pack_format, cmd, rlpacked, len(topic_b), topic_b, properties, payload
    )