File: test_dnd.py

package info (click to toggle)
python-roborock 4.12.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,480 kB
  • sloc: python: 16,602; makefile: 17; sh: 6
file content (178 lines) | stat: -rw-r--r-- 5,915 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""Tests for the DoNotDisturbTrait class."""

from unittest.mock import AsyncMock, call

import pytest

from roborock.data import DnDTimer
from roborock.devices.device import RoborockDevice
from roborock.devices.traits.v1.do_not_disturb import DoNotDisturbTrait
from roborock.exceptions import RoborockException
from roborock.roborock_typing import RoborockCommand


@pytest.fixture
async def dnd_trait(device: RoborockDevice) -> DoNotDisturbTrait:
    """Create a DoNotDisturbTrait instance with mocked dependencies."""
    assert device.v1_properties
    assert device.v1_properties.dnd
    return device.v1_properties.dnd


@pytest.fixture
def sample_dnd_timer() -> DnDTimer:
    """Create a sample DnDTimer for testing."""
    return DnDTimer(
        start_hour=22,
        start_minute=0,
        end_hour=8,
        end_minute=0,
        enabled=1,
    )


async def test_get_dnd_timer_success(
    dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock, sample_dnd_timer: DnDTimer
) -> None:
    """Test successfully getting DnD timer settings."""
    # Setup mock to return the sample DnD timer
    mock_rpc_channel.send_command.return_value = sample_dnd_timer.as_dict()

    # Call the method
    await dnd_trait.refresh()

    # Verify the result
    assert dnd_trait.start_hour == 22
    assert dnd_trait.start_minute == 0
    assert dnd_trait.end_hour == 8
    assert dnd_trait.end_minute == 0
    assert dnd_trait.enabled == 1
    assert dnd_trait.is_on

    # Verify the RPC call was made correctly
    mock_rpc_channel.send_command.assert_called_once_with(RoborockCommand.GET_DND_TIMER)


async def test_get_dnd_timer_disabled(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
    """Test getting DnD timer when it's disabled."""
    disabled_timer = DnDTimer(
        start_hour=22,
        start_minute=0,
        end_hour=8,
        end_minute=0,
        enabled=0,
    )
    mock_rpc_channel.send_command.return_value = disabled_timer.as_dict()

    await dnd_trait.refresh()

    assert dnd_trait.enabled == 0
    assert not dnd_trait.is_on
    mock_rpc_channel.send_command.assert_called_once_with(RoborockCommand.GET_DND_TIMER)


async def test_set_dnd_timer_success(
    dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock, sample_dnd_timer: DnDTimer
) -> None:
    """Test successfully setting DnD timer settings."""
    mock_rpc_channel.send_command.side_effect = [
        # Response for SET_DND_TIMER
        {},
        # Response for GET_DND_TIMER after updating
        {
            "startHour": 22,
            "startMinute": 0,
            "endHour": 8,
            "endMinute": 0,
            "enabled": 1,
        },
    ]

    # Call the method
    await dnd_trait.set_dnd_timer(sample_dnd_timer)

    # Verify the RPC call was made correctly with dataclass converted to dict

    expected_params = [22, 0, 8, 0]
    assert mock_rpc_channel.send_command.mock_calls == [
        call(RoborockCommand.SET_DND_TIMER, params=expected_params),
        call(RoborockCommand.GET_DND_TIMER),
    ]

    # Verify the trait state is updated
    assert dnd_trait.enabled == 1
    assert dnd_trait.is_on
    assert dnd_trait.start_hour == 22
    assert dnd_trait.start_minute == 0
    assert dnd_trait.end_hour == 8
    assert dnd_trait.end_minute == 0


async def test_clear_dnd_timer_success(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
    """Test successfully clearing DnD timer settings."""
    mock_rpc_channel.send_command.side_effect = [
        # Response for CLOSE_DND_TIMER
        {},
        # Response for GET_DND_TIMER after clearing
        {
            "startHour": 0,
            "startMinute": 0,
            "endHour": 0,
            "endMinute": 0,
            "enabled": 0,
        },
    ]

    # Call the method
    await dnd_trait.clear_dnd_timer()

    # Verify the RPC call was made correctly
    assert mock_rpc_channel.send_command.mock_calls == [
        call(RoborockCommand.CLOSE_DND_TIMER),
        call(RoborockCommand.GET_DND_TIMER),
    ]

    # Verify the trait state is updated
    assert dnd_trait.enabled == 0
    assert not dnd_trait.is_on
    assert dnd_trait.start_hour == 0
    assert dnd_trait.start_minute == 0
    assert dnd_trait.end_hour == 0
    assert dnd_trait.end_minute == 0


async def test_get_dnd_timer_propagates_exception(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
    """Test that exceptions from RPC channel are propagated in get_dnd_timer."""

    # Setup mock to raise an exception
    mock_rpc_channel.send_command.side_effect = RoborockException("Communication error")

    # Verify the exception is propagated
    with pytest.raises(RoborockException, match="Communication error"):
        await dnd_trait.refresh()


async def test_set_dnd_timer_propagates_exception(
    dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock, sample_dnd_timer: DnDTimer
) -> None:
    """Test that exceptions from RPC channel are propagated in set_dnd_timer."""
    from roborock.exceptions import RoborockException

    # Setup mock to raise an exception
    mock_rpc_channel.send_command.side_effect = RoborockException("Communication error")

    # Verify the exception is propagated
    with pytest.raises(RoborockException, match="Communication error"):
        await dnd_trait.set_dnd_timer(sample_dnd_timer)


async def test_clear_dnd_timer_propagates_exception(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
    """Test that exceptions from RPC channel are propagated in clear_dnd_timer."""
    from roborock.exceptions import RoborockException

    # Setup mock to raise an exception
    mock_rpc_channel.send_command.side_effect = RoborockException("Communication error")

    # Verify the exception is propagated
    with pytest.raises(RoborockException, match="Communication error"):
        await dnd_trait.clear_dnd_timer()