1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
|
"""Test ecowitt sensor module."""
from typing import Any
import pytest
from pytest_aiohttp import AiohttpClient
from aioecowitt.sensor import EcoWittSensor, EcoWittSensorTypes
from aioecowitt.server import EcoWittListener
from .const import GW2000A_V3_DATA
def test_update_listener() -> None:
"""Test on change get updates from callback."""
ecowit_sensor = EcoWittSensor(
"test", "test", EcoWittSensorTypes.TEMPERATURE_C, "test"
)
called = False
def on_change() -> None:
"""Test callback."""
nonlocal called
called = True
ecowit_sensor.update_cb.append(on_change)
ecowit_sensor.update_value(10, 0, 0)
assert called
called = False
ecowit_sensor.update_value(10, 0, 0)
assert not called
ecowit_sensor.update_value(11, 0, 0)
assert called
@pytest.mark.parametrize(
("sensor_key", "expected_value", "request_data"),
[
("heap", GW2000A_V3_DATA["heap"], GW2000A_V3_DATA),
("vpd", 0.091, {**GW2000A_V3_DATA, "vpd": "0.091"}),
],
)
async def test_sensor(
ecowitt_server: EcoWittListener,
ecowitt_http: AiohttpClient,
sensor_key: str,
expected_value: Any,
request_data: dict[str, Any],
) -> None:
"""Test sensor is correct handled."""
target_sensor = None
def on_change(sensor: EcoWittSensor) -> None:
"""Test callback."""
if sensor.key == sensor_key:
nonlocal target_sensor
target_sensor = sensor
ecowitt_server.new_sensor_cb.append(on_change)
resp = await ecowitt_http.post("/", data=request_data)
assert resp.status == 200
text = await resp.text()
assert text == "OK"
assert target_sensor
assert target_sensor.value == expected_value
|