File: health_check.py

package info (click to toggle)
zwave-js-server-python 0.67.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,820 kB
  • sloc: python: 15,886; sh: 21; javascript: 16; makefile: 2
file content (149 lines) | stat: -rw-r--r-- 5,341 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Provide a model for the Z-Wave JS node's health checks and power tests."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import TypedDict

from ...const import PowerLevel


class LifelineHealthCheckResultDataType(TypedDict, total=False):
    """Represent a lifeline health check result data dict type."""

    # https://github.com/zwave-js/node-zwave-js/blob/master/packages/zwave-js/src/lib/node/Types.ts#L171
    latency: int  # required
    numNeighbors: int  # required
    failedPingsNode: int  # required
    rating: int  # required
    routeChanges: int
    minPowerlevel: int
    failedPingsController: int
    snrMargin: int


class LifelineHealthCheckSummaryDataType(TypedDict):
    """Represent a lifeline health check summary data dict type."""

    # https://github.com/zwave-js/node-zwave-js/blob/master/packages/zwave-js/src/lib/node/_Types.ts#L254
    results: list[LifelineHealthCheckResultDataType]
    rating: int


@dataclass
class LifelineHealthCheckResult:
    """Represent a lifeline health check result."""

    data: LifelineHealthCheckResultDataType = field(repr=False)
    latency: int = field(init=False)
    num_neighbors: int = field(init=False)
    failed_pings_node: int = field(init=False)
    rating: int = field(init=False)
    route_changes: int | None = field(init=False)
    min_power_level: PowerLevel | None = field(init=False, default=None)
    failed_pings_controller: int | None = field(init=False)
    snr_margin: int | None = field(init=False)

    def __post_init__(self) -> None:
        """Post initialize."""
        self.latency = self.data["latency"]
        self.num_neighbors = self.data["numNeighbors"]
        self.failed_pings_node = self.data["failedPingsNode"]
        self.rating = self.data["rating"]
        self.route_changes = self.data.get("routeChanges")
        if (min_power_level := self.data.get("minPowerlevel")) is not None:
            self.min_power_level = PowerLevel(min_power_level)
        self.failed_pings_controller = self.data.get("failedPingsController")
        self.snr_margin = self.data.get("snrMargin")


@dataclass
class LifelineHealthCheckSummary:
    """Represent a lifeline health check summary update."""

    data: LifelineHealthCheckSummaryDataType = field(repr=False)
    rating: int = field(init=False)
    results: list[LifelineHealthCheckResult] = field(init=False)

    def __post_init__(self) -> None:
        """Post initialize."""
        self.rating = self.data["rating"]
        self.results = [
            LifelineHealthCheckResult(r) for r in self.data.get("results", [])
        ]


class RouteHealthCheckResultDataType(TypedDict, total=False):
    """Represent a route health check result data dict type."""

    # https://github.com/zwave-js/node-zwave-js/blob/master/packages/zwave-js/src/lib/node/_Types.ts#L285
    numNeighbors: int  # required
    rating: int  # required
    failedPingsToTarget: int
    failedPingsToSource: int
    minPowerlevelSource: int
    minPowerlevelTarget: int


class RouteHealthCheckSummaryDataType(TypedDict):
    """Represent a route health check summary data dict type."""

    # https://github.com/zwave-js/node-zwave-js/blob/master/packages/zwave-js/src/lib/node/_Types.ts#L317
    results: list[RouteHealthCheckResultDataType]
    rating: int


@dataclass
class RouteHealthCheckResult:
    """Represent a route health check result."""

    data: RouteHealthCheckResultDataType = field(repr=False)
    num_neighbors: int = field(init=False)
    rating: int = field(init=False)
    failed_pings_to_target: int | None = field(init=False)
    failed_pings_to_source: int | None = field(init=False)
    min_power_level_source: PowerLevel | None = field(init=False, default=None)
    min_power_level_target: PowerLevel | None = field(init=False, default=None)

    def __post_init__(self) -> None:
        """Post initialize."""
        self.num_neighbors = self.data["numNeighbors"]
        self.rating = self.data["rating"]
        self.failed_pings_to_target = self.data.get("failedPingsToTarget")
        self.failed_pings_to_source = self.data.get("failedPingsToSource")
        if (min_power_level_source := self.data.get("minPowerlevelSource")) is not None:
            self.min_power_level_source = PowerLevel(min_power_level_source)
        if (min_power_level_target := self.data.get("minPowerlevelTarget")) is not None:
            self.min_power_level_target = PowerLevel(min_power_level_target)


@dataclass
class RouteHealthCheckSummary:
    """Represent a route health check summary update."""

    data: RouteHealthCheckSummaryDataType = field(repr=False)
    rating: int = field(init=False)
    results: list[RouteHealthCheckResult] = field(init=False)

    def __post_init__(self) -> None:
        """Post initialize."""
        self.rating = self.data["rating"]
        self.results = [RouteHealthCheckResult(r) for r in self.data.get("results", [])]


@dataclass
class TestPowerLevelProgress:
    """Class to represent a test power level progress update."""

    acknowledged: int
    total: int


@dataclass
class CheckHealthProgress:
    """Represent a check lifeline/route health progress update."""

    rounds: int
    total_rounds: int
    last_rating: int
    last_result: LifelineHealthCheckResult | RouteHealthCheckResult