File: test_cover.py

package info (click to toggle)
python-elmax-api 0.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 324 kB
  • sloc: python: 1,378; makefile: 20
file content (120 lines) | stat: -rw-r--r-- 4,664 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""Test the actuator functionalities."""
import asyncio
import time

import pytest

from elmax_api.http import GenericElmax
from elmax_api.model.command import CoverCommand
from elmax_api.model.cover import Cover
from elmax_api.model.cover_status import CoverStatus
from elmax_api.model.panel import PanelStatus, PanelEntry
from tests import client, LOCAL_TEST


async def wait_for_cover_status(client: GenericElmax, endpoint_id: str, status: CoverStatus, timeout: float) -> bool:
    t = time.time()
    deadline = t + timeout

    while time.time() < deadline:
        try:
            cur_status = await client.get_endpoint_status(endpoint_id=endpoint_id)
            cover: Cover = cur_status.covers[0]
            if cover.status == status:
                return True
        finally:
            await asyncio.sleep(delay=2)
    return False


async def wait_for_cover_position(client: GenericElmax, endpoint_id: str, position: int, timeout: float) -> bool:
    t = time.time()
    deadline = t + timeout

    while time.time() < deadline:
        try:
            cur_status = await client.get_endpoint_status(endpoint_id=endpoint_id)
            cover: Cover = cur_status.covers[0]
            if cover.position == position:
                return True
        finally:
            await asyncio.sleep(delay=2)
    print("TIMEOUT WHILE WAITING FOR COVER MOVING")
    raise TimeoutError()


def setup_module(module):
    if not LOCAL_TEST:
        panels = asyncio.run(client.list_control_panels())
        online_panels = list(filter(lambda x: x.online, panels))
        assert len(online_panels) > 0

        # Select the first online panel which has covers
        panel_found = False
        for panel in online_panels:
            panel_status = asyncio.run(client.get_panel_status(panel.hash))
            if len(panel_status.covers) > 0:
                panel_found = True
                client.set_current_panel(panel_id=panel.hash)
                break

        if not panel_found:
            pytest.skip("No panel found to run this test set.")

@pytest.mark.asyncio
async def test_open_close():
    # Do this twice so we toggle every cover up->down and down ->up
    for i in range(2):
        # Retrieve its status
        panel = await client.get_current_panel_status()  # type: PanelStatus
        assert isinstance(panel, PanelStatus)

        if len(panel.covers) < 1:
            pytest.skip(f"No covers found on the specified panel: {client.current_panel_id}")

        # Store old status into a dictionary for later comparison
        cover_position = { cover.endpoint_id: cover.position for cover in panel.covers}

        # Toggle all the actuators
        tasks = []
        for endpoint_id, curr_status in cover_position.items():
            command = CoverCommand.UP if curr_status==0 else CoverCommand.DOWN
            await client.execute_command(endpoint_id=endpoint_id, command=command)
            expected_position = 100 if command==CoverCommand.UP else 0
            t = wait_for_cover_position(client=client, endpoint_id=endpoint_id, position=expected_position, timeout=30.0)
            tasks.append(t)

        # Ensure all the actuators switched correctly
        done, pending = await asyncio.wait(tasks, return_when="FIRST_EXCEPTION")
        for t in done:
            if t.exception():
                pytest.fail("One of the covers failed")
                t.cancel()


@pytest.mark.asyncio
async def test_up_down_states():
    # Do this twice so we toggle every cover up->down and down ->up
    for i in range(2):
        # Retrieve its status
        panel = await client.get_current_panel_status()  # type: PanelStatus
        assert isinstance(panel, PanelStatus)

        # Store old status into a dictionary for later comparison
        cover_position = {cover.endpoint_id: cover.position for cover in panel.covers}

        # Toggle all the actuators
        tasks = []
        for endpoint_id, curr_status in cover_position.items():
            command = CoverCommand.UP if curr_status==0 else CoverCommand.DOWN
            await client.execute_command(endpoint_id=endpoint_id, command=command)
            expected_status = CoverStatus.UP if command == CoverCommand.UP else CoverStatus.DOWN
            t = wait_for_cover_status(client=client, endpoint_id=endpoint_id, status=expected_status,timeout=4.0)
            tasks.append(t)

        # Ensure all the actuators switched correctly
        done, pending = await asyncio.wait(tasks, return_when="FIRST_EXCEPTION")
        for t in done:
            if t.exception():
                pytest.fail("One of the covers failed")
                t.cancel()