1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
"""Test the actuator functionalities."""
import asyncio
import time
import pytest
from elmax_api.http import GenericElmax
from elmax_api.model.command import CoverCommand
from elmax_api.model.cover import Cover
from elmax_api.model.cover_status import CoverStatus
from elmax_api.model.panel import PanelStatus, PanelEntry
from tests import client, LOCAL_TEST
async def wait_for_cover_status(client: GenericElmax, endpoint_id: str, status: CoverStatus, timeout: float) -> bool:
t = time.time()
deadline = t + timeout
while time.time() < deadline:
try:
cur_status = await client.get_endpoint_status(endpoint_id=endpoint_id)
cover: Cover = cur_status.covers[0]
if cover.status == status:
return True
finally:
await asyncio.sleep(delay=2)
return False
async def wait_for_cover_position(client: GenericElmax, endpoint_id: str, position: int, timeout: float) -> bool:
t = time.time()
deadline = t + timeout
while time.time() < deadline:
try:
cur_status = await client.get_endpoint_status(endpoint_id=endpoint_id)
cover: Cover = cur_status.covers[0]
if cover.position == position:
return True
finally:
await asyncio.sleep(delay=2)
print("TIMEOUT WHILE WAITING FOR COVER MOVING")
raise TimeoutError()
def setup_module(module):
if not LOCAL_TEST:
panels = asyncio.run(client.list_control_panels())
online_panels = list(filter(lambda x: x.online, panels))
assert len(online_panels) > 0
# Select the first online panel which has covers
panel_found = False
for panel in online_panels:
panel_status = asyncio.run(client.get_panel_status(panel.hash))
if len(panel_status.covers) > 0:
panel_found = True
client.set_current_panel(panel_id=panel.hash)
break
if not panel_found:
pytest.skip("No panel found to run this test set.")
@pytest.mark.asyncio
async def test_open_close():
# Do this twice so we toggle every cover up->down and down ->up
for i in range(2):
# Retrieve its status
panel = await client.get_current_panel_status() # type: PanelStatus
assert isinstance(panel, PanelStatus)
if len(panel.covers) < 1:
pytest.skip(f"No covers found on the specified panel: {client.current_panel_id}")
# Store old status into a dictionary for later comparison
cover_position = { cover.endpoint_id: cover.position for cover in panel.covers}
# Toggle all the actuators
tasks = []
for endpoint_id, curr_status in cover_position.items():
command = CoverCommand.UP if curr_status==0 else CoverCommand.DOWN
await client.execute_command(endpoint_id=endpoint_id, command=command)
expected_position = 100 if command==CoverCommand.UP else 0
t = wait_for_cover_position(client=client, endpoint_id=endpoint_id, position=expected_position, timeout=30.0)
tasks.append(t)
# Ensure all the actuators switched correctly
done, pending = await asyncio.wait(tasks, return_when="FIRST_EXCEPTION")
for t in done:
if t.exception():
pytest.fail("One of the covers failed")
t.cancel()
@pytest.mark.asyncio
async def test_up_down_states():
# Do this twice so we toggle every cover up->down and down ->up
for i in range(2):
# Retrieve its status
panel = await client.get_current_panel_status() # type: PanelStatus
assert isinstance(panel, PanelStatus)
# Store old status into a dictionary for later comparison
cover_position = {cover.endpoint_id: cover.position for cover in panel.covers}
# Toggle all the actuators
tasks = []
for endpoint_id, curr_status in cover_position.items():
command = CoverCommand.UP if curr_status==0 else CoverCommand.DOWN
await client.execute_command(endpoint_id=endpoint_id, command=command)
expected_status = CoverStatus.UP if command == CoverCommand.UP else CoverStatus.DOWN
t = wait_for_cover_status(client=client, endpoint_id=endpoint_id, status=expected_status,timeout=4.0)
tasks.append(t)
# Ensure all the actuators switched correctly
done, pending = await asyncio.wait(tasks, return_when="FIRST_EXCEPTION")
for t in done:
if t.exception():
pytest.fail("One of the covers failed")
t.cancel()
|