1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
"""Identity module.
This module contains all the classes and method to handle
the identity of an Asus device.
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from itertools import chain
import logging
from typing import Any
from asusrouter.error import AsusRouterIdentityError
from asusrouter.modules.aimesh import AiMeshDevice
from asusrouter.modules.color import color_zone
from asusrouter.modules.data import AsusData
from asusrouter.modules.endpoint import (
Endpoint,
EndpointNoCheck,
EndpointTools,
EndpointType,
check_available,
)
from asusrouter.modules.endpoint.onboarding import (
process as process_onboarding,
read as read_onboarding,
)
from asusrouter.modules.firmware import Firmware
from asusrouter.modules.wlan import WLAN_TYPE, Wlan
from asusrouter.tools import writers
from asusrouter.tools.cleaners import clean_dict
from asusrouter.tools.converters import (
safe_exists,
safe_list_from_string,
safe_unpack_keys,
)
_LOGGER = logging.getLogger(__name__)
MAP_IDENTITY: tuple = (
("serial_no", "serial"),
("label_mac", "mac"),
("lan_hwaddr", "lan_mac"),
("wan_hwaddr", "wan_mac"),
("productid", "product_id"),
("productid", "model"),
("firmver", "fw_major"),
("buildno", "fw_minor"),
("extendno", "fw_build"),
("rc_support", "services", safe_list_from_string),
("ss_support", "services", safe_list_from_string),
("led_val", "led", safe_exists),
("ledg_rgb1", "aura", safe_exists),
("ledg_rgb2", "aura_zone", color_zone),
)
@dataclass
class AsusDevice: # pylint: disable=too-many-instance-attributes
"""Asus device class.
This class contains information about an Asus device
and can represent a router, AiMesh nore or range extender.
"""
# Device-defining values
serial: str | None = None
mac: str | None = None
product_id: str | None = None
model: str | None = None
brand: str = "ASUSTek"
# Supported features
aimesh: bool = False
# Device information
firmware: Firmware | None = None
merlin: bool = False
wlan: list[Wlan] | None = None
endpoints: dict[EndpointType, bool] | None = None
services: list[str] | None = None
# Flags for device features
aura: bool = False
aura_zone: int = 0
dsl: bool = False
led: bool = False
ookla: bool = False
vpn_status: bool = False
async def collect_identity(
api_hook: Callable[..., Awaitable[dict[str, Any]]],
api_query: Callable[..., Awaitable[Any]],
) -> AsusDevice:
"""Collect device identity."""
_LOGGER.debug("Collecting device identity")
# Prepare a request
request_values = []
for map_item in MAP_IDENTITY:
key, _, _ = safe_unpack_keys(map_item)
request_values.append(key)
request = writers.nvram(request_values)
# Get the identity data
try:
identity_map = await api_hook(request)
except Exception as ex: # pylint: disable=broad-except
raise AsusRouterIdentityError(
"Failed to collect identity data from the router"
) from ex
_LOGGER.debug("Identity collected")
# Read the identity
identity = _read_nvram(identity_map)
_LOGGER.debug("Identity read")
# Process services
identity["aimesh"] = "amas" in identity["services"]
# Check endpoints
endpoints, onboarding = await _check_endpoints(api_query)
identity["endpoints"] = endpoints
_LOGGER.debug("Endpoints checked")
# Manually assign Aura endpoint value
identity["endpoints"][EndpointTools.AURA] = identity.get("aura", False)
# Check onboarding to get nice model name
this_device = onboarding.get(identity["mac"])
if isinstance(this_device, AiMeshDevice):
identity["model"] = this_device.model
# Check if DSL
if endpoints[Endpoint.DSL] is True:
identity["dsl"] = True
_LOGGER.debug("DSL-compatible device detected")
# Check if Merlin
if endpoints[Endpoint.SYSINFO] is True:
identity["merlin"] = True
_LOGGER.debug("Merlin FW detected")
# Return the identity convered from a dict
return AsusDevice(**identity)
# UPDATE THIS METHOD
def _read_nvram(data: dict[str, Any]) -> dict[str, Any]: # noqa: C901, PLR0912
"""Read the NVRAM identity data."""
# Check the input data
if not data:
raise AsusRouterIdentityError("No nvram data received")
# Create the identity dictionary
identity: dict[str, Any] = {}
# Loop through the identity map
for map_item in MAP_IDENTITY:
key, key_to_use, method = safe_unpack_keys(map_item)
try:
value = method(data[key]) if method else data[key]
if key_to_use in identity:
if isinstance(identity[key_to_use], list):
identity[key_to_use].extend(value)
else:
identity[key_to_use] = value
else:
identity[key_to_use] = value
except Exception as ex:
raise ex
# Clean up the identity dictionary
identity = clean_dict(identity)
# MAC (for some Merlin firmwares missing label_mac)
if not identity.get("mac"):
if identity.get("lan_mac"):
identity["mac"] = identity["lan_mac"]
elif identity.get("wan_mac"):
identity["mac"] = identity["wan_mac"]
# Remove extra MACs
identity.pop("lan_mac")
identity.pop("wan_mac")
# Firmware
# TODO: Optimize this
firmware = Firmware(
f"{identity['fw_major']}.{identity['fw_minor']}.{identity['fw_build']}"
)
identity["firmware"] = firmware
identity.pop("fw_major")
identity.pop("fw_minor")
identity.pop("fw_build")
# WLAN list
identity["wlan"] = []
for value in identity["services"]:
if value in WLAN_TYPE:
identity["wlan"].append(WLAN_TYPE[value])
# OOKLA Speedtest
if "ookla" in identity["services"]:
identity["ookla"] = True
return identity
async def _check_endpoints(
api_hook: Callable[..., Awaitable[Any]],
) -> tuple[dict[EndpointType, bool], dict[str, Any]]:
"""Check which endpoints are available."""
endpoints: dict[EndpointType, bool] = {}
contents: dict[EndpointType, Any] = {}
for endpoint in chain(Endpoint, EndpointTools):
if endpoint.name in EndpointNoCheck.__members__:
continue
result, content = await check_available(endpoint, api_hook)
endpoints[endpoint] = result
contents[endpoint] = content
onboarding = {}
if endpoints[Endpoint.ONBOARDING] is True:
onboarding = process_onboarding(
read_onboarding(contents[Endpoint.ONBOARDING])
).get(AsusData.AIMESH, {})
return (endpoints, onboarding)
|