1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
"""Data module."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from enum import Enum
from typing import Any
from asusrouter.tools.converters import safe_bool
class AsusData(str, Enum):
"""AsusRouter data class."""
AIMESH = "aimesh"
AURA = "aura"
BOOTTIME = "boottime"
CLIENTS = "clients"
CPU = "cpu"
DDNS = "ddns"
DEVICEMAP = "devicemap"
DSL = "dsl"
FIRMWARE = "firmware"
FIRMWARE_NOTE = "firmware_note"
FLAGS = "flags"
GWLAN = "gwlan"
LED = "led"
NETWORK = "network"
NODE_INFO = "node_info"
OPENVPN = "openvpn"
OPENVPN_CLIENT = "openvpn_client"
OPENVPN_SERVER = "openvpn_server"
PARENTAL_CONTROL = "parental_control"
PING = "ping"
PORT_FORWARDING = "port_forwarding"
PORTS = "ports"
RAM = "ram"
SPEEDTEST = "speedtest"
# SPEEDTEST_HISTORY = "speedtest_history"
SPEEDTEST_RESULT = "speedtest_result"
# SPEEDTEST_SERVERS = "speedtest_servers"
SYSINFO = "sysinfo"
SYSTEM = "system"
TEMPERATURE = "temperature"
VPNC = "vpnc"
VPNC_CLIENTLIST = "vpnc_clientlist"
WAN = "wan"
WIREGUARD = "wireguard"
WIREGUARD_CLIENT = "wireguard_client"
WIREGUARD_SERVER = "wireguard_server"
WLAN = "wlan"
@dataclass
class AsusDataState:
"""State of data."""
data: Any | None = None
timestamp: datetime = datetime.now(UTC)
active: bool = False
inactive_event: asyncio.Event = asyncio.Event()
def start(self) -> None:
"""Set to active."""
self.active = True
self.inactive_event.clear()
def stop(self) -> None:
"""Set to not-active."""
self.active = False
self.inactive_event.set()
def update(self, data: Any) -> None:
"""Update the state."""
self.data = data
# Set timestamp to the current utc time
self.timestamp = datetime.now(UTC)
# Set to inactive
self.stop()
def update_state(self, state: Any, last_id: int | None = None) -> None:
"""Update a state variable in the data dict."""
# Convert the state if needed
state = convert_state(state)
if last_id is not None:
if not isinstance(self.data, dict):
self.data = {}
self.data.setdefault(last_id, {})["state"] = state
return
if isinstance(self.data, dict):
self.data["state"] = state
return
self.data = {"state": state}
def offset_time(self, offset: int | None) -> None:
"""Offset the timestamp."""
if offset is None:
self.timestamp = datetime.now(UTC)
return
self.timestamp = datetime.now(UTC) + timedelta(seconds=offset)
def convert_state(state: Any) -> bool:
"""Convert the state to a correct one."""
# If the state is not boolean, convert it to boolean
if not isinstance(state, bool):
state = safe_bool(state)
return bool(state)
|