File: devicemap.py

package info (click to toggle)
python-asusrouter 1.21.3-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,856 kB
  • sloc: python: 20,497; makefile: 3
file content (316 lines) | stat: -rw-r--r-- 10,128 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""Devicemap endpoint module."""

from __future__ import annotations

from datetime import datetime, timedelta
import logging
import re
from typing import Any

import xmltodict

from asusrouter.config import ARConfig, ARConfigKey as ARConfKey
from asusrouter.modules.data import AsusData, AsusDataState
from asusrouter.modules.endpoint import data_get
from asusrouter.modules.openvpn import AsusOVPNClient, AsusOVPNServer
from asusrouter.tools.cleaners import clean_dict, clean_dict_key_prefix
from asusrouter.tools.converters import safe_datetime, safe_int
from asusrouter.tools.readers import merge_dicts

from .devicemap_const import (
    DEVICEMAP_BY_INDEX,
    DEVICEMAP_BY_KEY,
    DEVICEMAP_CLEAR,
)

_LOGGER = logging.getLogger(__name__)

REQUIRE_HISTORY = True


def read(content: str, **kwargs: Any) -> dict[str, Any]:
    """Read devicemap data."""

    # Create a dict to store the data
    devicemap: dict[str, Any] = {}

    # Parse the XML data
    try:
        xml_content: dict[str, Any] = xmltodict.parse(content).get(
            "devicemap", {}
        )
        if not xml_content:
            _LOGGER.debug("Received empty devicemap XML")
            return devicemap
    except xmltodict.expat.ExpatError as ex:  # pyright: ignore[reportAttributeAccessIssue]
        _LOGGER.debug("Received invalid devicemap XML: %s", ex)
        return devicemap

    # Go through the data and fill the dict

    # Get values by index using read_devicemap_index method
    devicemap = merge_dicts(devicemap, read_index(xml_content))

    # Get values by key using read_devicemap_key method
    devicemap = merge_dicts(devicemap, read_key(xml_content))

    # Clear values from useless symbols
    for output_group, clear_map in DEVICEMAP_CLEAR.items():
        if output_group not in devicemap:
            continue
        for key, clear_value in clear_map.items():
            # If the key is not in the devicemap, continue
            if key not in devicemap[output_group]:
                continue
            devicemap[output_group][key] = devicemap[output_group][
                key
            ].replace(clear_value, "")

    # Clean the devicemap values
    devicemap = clean_dict(devicemap)

    # Clean the devicemap values_dict from output_group prefix
    for output_group, values_dict in devicemap.items():
        devicemap[output_group] = clean_dict_key_prefix(
            values_dict, output_group
        )

    # Return the devicemap
    return devicemap


def read_index(xml_content: dict[str, Any]) -> dict[str, Any]:
    """Read devicemap by index.

    This method performs reading of the devicemap by index
    to simplify the original read_devicemap method.
    """

    # Create a dict to store the data
    devicemap: dict[str, Any] = {}

    # Get values for which we only know their order (index)
    for output_group, input_group, input_values in DEVICEMAP_BY_INDEX:
        # Create an empty dictionary for the output group
        devicemap[output_group] = {}

        # Check that the input group is in the xml content
        if input_group not in xml_content:
            continue

        # Use dict comprehension to build output_group_data
        output_group_data = {
            input_value: xml_content[input_group][index]
            for index, input_value in enumerate(input_values)
            if index < len(xml_content[input_group])
        }

        # Add the output group data to the devicemap
        devicemap[output_group] = output_group_data

    # Return the devicemap
    return devicemap


def read_key(xml_content: dict[str, Any]) -> dict[str, Any]:
    """Read devicemap by key.

    This method performs reading of the devicemap by key
    to simplify the original read_devicemap method.
    """

    # Create a dict to store the data
    devicemap: dict[str, Any] = {}

    # Get values for which we know their key
    for output_group, input_group, input_values in DEVICEMAP_BY_KEY:
        # Create a dict to store the data
        output_group_data: dict[str, Any] = {}

        # Go through the input values and fill the dict
        for input_value in input_values:
            # Get the input group data
            xml_input_group = xml_content.get(input_group)

            # If the input group data is None, skip this iteration
            if xml_input_group is None:
                continue

            # If the input group data is a string, convert it to a list
            if isinstance(xml_input_group, str):
                xml_input_group = [xml_input_group]

            # Go through the input group data and check
            # if the input value is in it
            for value in xml_input_group:
                if input_value in value:
                    # Add the input value to the output group data
                    output_group_data[input_value] = value.replace(
                        f"{input_value}=", ""
                    )
                    break

        # Add the output group data to the devicemap
        devicemap[output_group] = output_group_data

    # Return the devicemap
    return devicemap


# This method performs reading of the devicemap special values
# pylint: disable-next=unused-argument
def read_special(xml_content: dict[str, Any]) -> dict[str, Any]:
    """Read devicemap special values."""

    # This method is not implemented yet

    return {}


def read_uptime_string(
    content: str,
) -> tuple[datetime | None, int | None]:
    """Read uptime string and return proper datetime object."""

    # Split the content into the date/time part and the seconds part
    uptime_parts = content.split("(")
    if len(uptime_parts) < 2:  # noqa: PLR2004
        return (None, None)

    # Extract the number of seconds from the seconds part
    seconds_match = re.search("([0-9]+)", uptime_parts[1])
    if not seconds_match:
        return (None, None)
    seconds: int | None = safe_int(seconds_match.group())

    when = safe_datetime(uptime_parts[0])
    if when is None or seconds is None:
        return (None, seconds)

    uptime = when - timedelta(seconds=seconds)

    # If robust_boottime is enabled, floor the uptime to even seconds
    # This will introduce a systematic error with up to 1 second delay
    # but will avoid raw data uncertainty and the resulting jitter
    if ARConfig.get(ARConfKey.ROBUST_BOOTTIME) is True:
        even_seconds = uptime.second - (uptime.second % 2)
        uptime = uptime.replace(second=even_seconds, microsecond=0)

    return (uptime, seconds)


def process(data: dict[str, Any]) -> dict[AsusData, Any]:
    """Process data from devicemap endpoint."""

    # Get the passed arguments
    history: dict[AsusData, AsusDataState] = data_get(data, "history") or {}

    # Devicemap - just the data itself
    devicemap = data

    # Boot time
    prev_boottime_object: AsusDataState | None = history.get(AsusData.BOOTTIME)
    prev_boottime = prev_boottime_object.data if prev_boottime_object else None
    boottime, reboot = process_boottime(devicemap, prev_boottime)

    # Mark reboot
    flags = {}
    if reboot:
        flags["reboot"] = True

    # OpenVPN
    openvpn = process_ovpn(devicemap)

    # Return the processed data
    return {
        AsusData.DEVICEMAP: devicemap,
        AsusData.BOOTTIME: boottime,
        AsusData.OPENVPN: openvpn,
        AsusData.FLAGS: flags,
    }


def process_boottime(
    devicemap: dict[str, Any], prev_boottime: dict[str, Any] | None
) -> tuple[dict[str, Any], bool]:
    """Process boottime data."""

    # Reboot flag
    reboot = False

    boottime: dict[str, Any] = {}

    # Since precision is 1 second, could be that old and new
    # are 1 sec different. In this case, we should not change
    # the boot time, but keep the previous value to avoid regular changes
    sys = devicemap.get("sys")
    if sys:
        uptime_str = sys.get("uptimeStr")
        _LOGGER.debug("Uptime string: %s", uptime_str)
        if uptime_str:
            time, seconds = read_uptime_string(uptime_str)
            if time:
                boottime["datetime"] = time
                if prev_boottime and "datetime" in prev_boottime:
                    delta = time - prev_boottime["datetime"]

                    # Check for reboot
                    if abs(delta.seconds) >= 2 and delta.seconds >= 0:  # noqa: PLR2004
                        reboot = True
                    else:
                        boottime = prev_boottime

                boottime["uptime"] = seconds

    return boottime, reboot


def process_ovpn(devicemap: dict[str, Any]) -> dict[str, Any]:
    """Process OpenVPN data."""

    vpn: dict[str, Any] = {
        "client": {},
        "server": {},
    }
    vpnmap = devicemap.get("vpn")

    if vpnmap:
        # There are only 5 clients actually in the current firmware
        # but we in case it will be ever changed in the future
        for num in range(1, 10):
            # Check if this client exists
            if f"client{num}_state" not in vpnmap:
                break

            # Get client data
            # We define default state as 0, since it's not always present
            client_state = AsusOVPNClient(
                safe_int(vpnmap.get(f"client{num}_state"), default=0)
            )
            client_errno = safe_int(vpnmap.get(f"client{num}_errno"))

            # Assign client data
            vpn["client"][num] = {
                "state": client_state,
                "errno": client_errno,
            }

        # Server data. Usually only 2 servers
        # but we in case it will be ever changed in the future
        for num in range(1, 5):
            # Check if this server exists
            if f"server{num}_state" not in vpnmap:
                break

            # Get server data
            # We define default state as 0, since it's not always present
            server_state = AsusOVPNServer(
                safe_int(vpnmap.get(f"server{num}_state"), default=0)
            )

            # Assign server data
            vpn["server"][num] = {
                "state": server_state,
            }

    return vpn