1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
|
"""Tests for the DoNotDisturbTrait class."""
from unittest.mock import AsyncMock, call
import pytest
from roborock.data import DnDTimer
from roborock.devices.device import RoborockDevice
from roborock.devices.traits.v1.do_not_disturb import DoNotDisturbTrait
from roborock.exceptions import RoborockException
from roborock.roborock_typing import RoborockCommand
@pytest.fixture
async def dnd_trait(device: RoborockDevice) -> DoNotDisturbTrait:
"""Create a DoNotDisturbTrait instance with mocked dependencies."""
assert device.v1_properties
assert device.v1_properties.dnd
return device.v1_properties.dnd
@pytest.fixture
def sample_dnd_timer() -> DnDTimer:
"""Create a sample DnDTimer for testing."""
return DnDTimer(
start_hour=22,
start_minute=0,
end_hour=8,
end_minute=0,
enabled=1,
)
async def test_get_dnd_timer_success(
dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock, sample_dnd_timer: DnDTimer
) -> None:
"""Test successfully getting DnD timer settings."""
# Setup mock to return the sample DnD timer
mock_rpc_channel.send_command.return_value = sample_dnd_timer.as_dict()
# Call the method
await dnd_trait.refresh()
# Verify the result
assert dnd_trait.start_hour == 22
assert dnd_trait.start_minute == 0
assert dnd_trait.end_hour == 8
assert dnd_trait.end_minute == 0
assert dnd_trait.enabled == 1
assert dnd_trait.is_on
# Verify the RPC call was made correctly
mock_rpc_channel.send_command.assert_called_once_with(RoborockCommand.GET_DND_TIMER)
async def test_get_dnd_timer_disabled(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
"""Test getting DnD timer when it's disabled."""
disabled_timer = DnDTimer(
start_hour=22,
start_minute=0,
end_hour=8,
end_minute=0,
enabled=0,
)
mock_rpc_channel.send_command.return_value = disabled_timer.as_dict()
await dnd_trait.refresh()
assert dnd_trait.enabled == 0
assert not dnd_trait.is_on
mock_rpc_channel.send_command.assert_called_once_with(RoborockCommand.GET_DND_TIMER)
async def test_set_dnd_timer_success(
dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock, sample_dnd_timer: DnDTimer
) -> None:
"""Test successfully setting DnD timer settings."""
mock_rpc_channel.send_command.side_effect = [
# Response for SET_DND_TIMER
{},
# Response for GET_DND_TIMER after updating
{
"startHour": 22,
"startMinute": 0,
"endHour": 8,
"endMinute": 0,
"enabled": 1,
},
]
# Call the method
await dnd_trait.set_dnd_timer(sample_dnd_timer)
# Verify the RPC call was made correctly with dataclass converted to dict
expected_params = [22, 0, 8, 0]
assert mock_rpc_channel.send_command.mock_calls == [
call(RoborockCommand.SET_DND_TIMER, params=expected_params),
call(RoborockCommand.GET_DND_TIMER),
]
# Verify the trait state is updated
assert dnd_trait.enabled == 1
assert dnd_trait.is_on
assert dnd_trait.start_hour == 22
assert dnd_trait.start_minute == 0
assert dnd_trait.end_hour == 8
assert dnd_trait.end_minute == 0
async def test_clear_dnd_timer_success(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
"""Test successfully clearing DnD timer settings."""
mock_rpc_channel.send_command.side_effect = [
# Response for CLOSE_DND_TIMER
{},
# Response for GET_DND_TIMER after clearing
{
"startHour": 0,
"startMinute": 0,
"endHour": 0,
"endMinute": 0,
"enabled": 0,
},
]
# Call the method
await dnd_trait.clear_dnd_timer()
# Verify the RPC call was made correctly
assert mock_rpc_channel.send_command.mock_calls == [
call(RoborockCommand.CLOSE_DND_TIMER),
call(RoborockCommand.GET_DND_TIMER),
]
# Verify the trait state is updated
assert dnd_trait.enabled == 0
assert not dnd_trait.is_on
assert dnd_trait.start_hour == 0
assert dnd_trait.start_minute == 0
assert dnd_trait.end_hour == 0
assert dnd_trait.end_minute == 0
async def test_get_dnd_timer_propagates_exception(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
"""Test that exceptions from RPC channel are propagated in get_dnd_timer."""
# Setup mock to raise an exception
mock_rpc_channel.send_command.side_effect = RoborockException("Communication error")
# Verify the exception is propagated
with pytest.raises(RoborockException, match="Communication error"):
await dnd_trait.refresh()
async def test_set_dnd_timer_propagates_exception(
dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock, sample_dnd_timer: DnDTimer
) -> None:
"""Test that exceptions from RPC channel are propagated in set_dnd_timer."""
from roborock.exceptions import RoborockException
# Setup mock to raise an exception
mock_rpc_channel.send_command.side_effect = RoborockException("Communication error")
# Verify the exception is propagated
with pytest.raises(RoborockException, match="Communication error"):
await dnd_trait.set_dnd_timer(sample_dnd_timer)
async def test_clear_dnd_timer_propagates_exception(dnd_trait: DoNotDisturbTrait, mock_rpc_channel: AsyncMock) -> None:
"""Test that exceptions from RPC channel are propagated in clear_dnd_timer."""
from roborock.exceptions import RoborockException
# Setup mock to raise an exception
mock_rpc_channel.send_command.side_effect = RoborockException("Communication error")
# Verify the exception is propagated
with pytest.raises(RoborockException, match="Communication error"):
await dnd_trait.clear_dnd_timer()
|