File: test_a01_channel.py

package info (click to toggle)
python-roborock 4.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites:
  • size: 2,476 kB
  • sloc: python: 16,570; makefile: 17; sh: 6
file content (53 lines) | stat: -rw-r--r-- 1,664 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
"""Tests for the a01_channel."""

from typing import Any

import pytest

from roborock.devices.rpc.a01_channel import send_decoded_command
from roborock.protocols.a01_protocol import encode_mqtt_payload
from roborock.roborock_message import (
    RoborockDyadDataProtocol,
    RoborockMessage,
    RoborockMessageProtocol,
)
from tests.fixtures.channel_fixtures import FakeChannel


@pytest.fixture
def mock_mqtt_channel() -> FakeChannel:
    """Fixture for a fake MQTT channel."""
    return FakeChannel()


async def test_id_query(mock_mqtt_channel: FakeChannel):
    """Test successful command sending and response decoding."""
    # Command parameters to send
    params: dict[RoborockDyadDataProtocol, Any] = {
        RoborockDyadDataProtocol.ID_QUERY: [
            RoborockDyadDataProtocol.WARM_LEVEL,
            RoborockDyadDataProtocol.POWER,
        ]
    }
    encoded = encode_mqtt_payload(
        {
            RoborockDyadDataProtocol.WARM_LEVEL: 101,
            RoborockDyadDataProtocol.POWER: 75,
        },
        value_encoder=lambda x: x,
    )
    response_message = RoborockMessage(
        protocol=RoborockMessageProtocol.RPC_RESPONSE, payload=encoded.payload, version=encoded.version
    )
    mock_mqtt_channel.response_queue.append(response_message)

    # Call the function to be tested
    result = await send_decoded_command(mock_mqtt_channel, params)  # type: ignore[call-overload]

    # Assertions
    assert result == {
        RoborockDyadDataProtocol.WARM_LEVEL: 101,
        RoborockDyadDataProtocol.POWER: 75,
    }
    mock_mqtt_channel.publish.assert_awaited_once()
    mock_mqtt_channel.subscribe.assert_awaited_once()