1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
"""Parse datasets TLV encoded as specified by Thread."""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import IntEnum
import struct
class TLVError(Exception):
"""TLV error."""
class MeshcopTLVType(IntEnum):
"""Types."""
CHANNEL = 0
PANID = 1
EXTPANID = 2
NETWORKNAME = 3
PSKC = 4
NETWORKKEY = 5
NETWORK_KEY_SEQUENCE = 6
MESHLOCALPREFIX = 7
STEERING_DATA = 8
BORDER_AGENT_RLOC = 9
COMMISSIONER_ID = 10
COMM_SESSION_ID = 11
SECURITYPOLICY = 12
GET = 13
ACTIVETIMESTAMP = 14
COMMISSIONER_UDP_PORT = 15
STATE = 16
JOINER_DTLS = 17
JOINER_UDP_PORT = 18
JOINER_IID = 19
JOINER_RLOC = 20
JOINER_ROUTER_KEK = 21
PROVISIONING_URL = 32
VENDOR_NAME_TLV = 33
VENDOR_MODEL_TLV = 34
VENDOR_SW_VERSION_TLV = 35
VENDOR_DATA_TLV = 36
VENDOR_STACK_VERSION_TLV = 37
UDP_ENCAPSULATION_TLV = 48
IPV6_ADDRESS_TLV = 49
PENDINGTIMESTAMP = 51
DELAYTIMER = 52
CHANNELMASK = 53
COUNT = 54
PERIOD = 55
SCAN_DURATION = 56
ENERGY_LIST = 57
# Seen in a dataset imported through iOS companion app
APPLE_TAG_UNKNOWN = 74
DISCOVERYREQUEST = 128
DISCOVERYRESPONSE = 129
JOINERADVERTISEMENT = 241
@dataclass
class MeshcopTLVItem:
"""Base class for TLV items."""
tag: int
data: bytes
def __str__(self) -> str:
"""Return a string representation."""
return self.data.hex()
@dataclass
class Channel(MeshcopTLVItem):
"""Channel."""
channel: int = field(init=False)
def __post_init__(self) -> None:
"""Decode the channel."""
self.channel = int.from_bytes(self.data, "big")
if not self.channel:
raise TLVError(f"invalid channel '{self.channel}'")
@dataclass
class NetworkName(MeshcopTLVItem):
"""Network name."""
name: str = field(init=False)
def __post_init__(self) -> None:
"""Decode the name."""
try:
self.name = self.data.decode()
except UnicodeDecodeError as err:
raise TLVError(f"invalid network name '{self.data.hex()}'") from err
def __str__(self) -> str:
return self.name
@dataclass
class Timestamp(MeshcopTLVItem):
"""Timestamp."""
authoritative: bool = field(init=False)
seconds: int = field(init=False)
ticks: int = field(init=False)
def __post_init__(self) -> None:
"""Decode the timestamp."""
# The timestamps are packed in 8 bytes:
# [seconds 48 bits][ticks 15 bits][authoritative flag 1 bit]
unpacked: int = struct.unpack("!Q", self.data)[0]
self.authoritative = bool(unpacked & 1)
self.seconds = unpacked >> 16
self.ticks = (unpacked >> 1) & 0x7FF
def _encode_item(item: MeshcopTLVItem) -> bytes:
"""Encode a dataset item to TLV format."""
data_len = len(item.data)
return struct.pack(f"!BB{data_len}s", item.tag, data_len, item.data)
def encode_tlv(items: dict[MeshcopTLVType, MeshcopTLVItem]) -> str:
"""Encode a TLV encoded dataset to a hex string.
Raises if the TLV is invalid.
"""
result = b""
for item in items.values():
result += _encode_item(item)
return result.hex()
def _parse_item(tag: MeshcopTLVType, data: bytes) -> MeshcopTLVItem:
"""Parse a TLV encoded dataset item."""
if tag == MeshcopTLVType.ACTIVETIMESTAMP:
return Timestamp(tag, data)
if tag == MeshcopTLVType.CHANNEL:
return Channel(tag, data)
if tag == MeshcopTLVType.NETWORKNAME:
return NetworkName(tag, data)
return MeshcopTLVItem(tag, data)
def parse_tlv(data: str) -> dict[MeshcopTLVType, MeshcopTLVItem]:
"""Parse a TLV encoded dataset.
Raises if the TLV is invalid.
"""
try:
data_bytes = bytes.fromhex(data)
except ValueError as err:
raise TLVError("invalid tlvs") from err
result = {}
pos = 0
while pos < len(data_bytes):
try:
tag = MeshcopTLVType(data_bytes[pos])
except ValueError as err:
raise TLVError(f"unknown type {data_bytes[pos]}") from err
pos += 1
_len = data_bytes[pos]
pos += 1
val = data_bytes[pos : pos + _len]
if len(val) < _len:
raise TLVError(f"expected {_len} bytes for {tag.name}, got {len(val)}")
pos += _len
if tag in result:
raise TLVError(f"duplicated tag {tag.name}")
result[tag] = _parse_item(tag, val)
return result
|