1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
|
"""Tests for the V1Channel class.
This test simulates communication across both the MQTT and local connections
and failure modes, ensuring the V1Channel behaves correctly in various scenarios.
"""
import json
import logging
from collections.abc import Iterator
from unittest.mock import AsyncMock, Mock, patch
import pytest
from roborock.data import NetworkInfo, RoborockStateCode, S5MaxStatus, UserData
from roborock.devices.cache import CacheData, InMemoryCache
from roborock.devices.local_channel import LocalSession
from roborock.devices.v1_channel import V1Channel
from roborock.exceptions import RoborockException
from roborock.protocol import (
create_local_decoder,
create_local_encoder,
create_mqtt_decoder,
create_mqtt_encoder,
)
from roborock.protocols.v1_protocol import MapResponse, SecurityData
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
from roborock.roborock_typing import RoborockCommand
from .. import mock_data
from ..conftest import FakeChannel
USER_DATA = UserData.from_dict(mock_data.USER_DATA)
TEST_DEVICE_UID = "abc123"
TEST_LOCAL_KEY = "local_key"
TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte")
TEST_HOST = mock_data.TEST_LOCAL_API_HOST
# Test messages for V1 protocol
TEST_REQUEST = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
payload=json.dumps({"dps": {"101": json.dumps({"id": 12346, "method": "get_status"})}}).encode(),
)
TEST_RESPONSE = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=json.dumps(
{"dps": {"102": json.dumps({"id": 12346, "result": {"state": RoborockStateCode.cleaning}})}}
).encode(),
)
TEST_RESPONSE_2 = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=json.dumps(
{"dps": {"102": json.dumps({"id": 12347, "result": {"state": RoborockStateCode.cleaning}})}}
).encode(),
)
TEST_NETWORK_INFO_RESPONSE = RoborockMessage(
protocol=RoborockMessageProtocol.RPC_RESPONSE,
payload=json.dumps({"dps": {"102": json.dumps({"id": 12345, "result": mock_data.NETWORK_INFO})}}).encode(),
)
TEST_NETWORKING_INFO = NetworkInfo.from_dict(mock_data.NETWORK_INFO)
# Encoders/Decoders
MQTT_ENCODER = create_mqtt_encoder(TEST_LOCAL_KEY)
MQTT_DECODER = create_mqtt_decoder(TEST_LOCAL_KEY)
LOCAL_ENCODER = create_local_encoder(TEST_LOCAL_KEY)
LOCAL_DECODER = create_local_decoder(TEST_LOCAL_KEY)
@pytest.fixture(name="mock_mqtt_channel")
async def setup_mock_mqtt_channel() -> FakeChannel:
"""Mock MQTT channel for testing."""
channel = FakeChannel()
await channel.connect()
return channel
@pytest.fixture(name="mock_local_channel")
async def setup_mock_local_channel() -> FakeChannel:
"""Mock Local channel for testing."""
return FakeChannel()
@pytest.fixture(name="mock_local_session")
def setup_mock_local_session(mock_local_channel: Mock) -> Mock:
"""Mock Local session factory for testing."""
mock_session = Mock(spec=LocalSession)
mock_session.return_value = mock_local_channel
return mock_session
@pytest.fixture(name="mock_request_id", autouse=True)
def setup_mock_request_id() -> Iterator[None]:
"""Assign sequential request ids for testing."""
next_id = 12345
def fake_next_int(*args) -> int:
nonlocal next_id
id_to_return = next_id
next_id += 1
return id_to_return
with patch("roborock.protocols.v1_protocol.get_next_int", side_effect=fake_next_int):
yield
@pytest.fixture(name="mock_create_map_response_decoder")
def setup_mock_map_decoder() -> Iterator[Mock]:
"""Mock the map response decoder to control its behavior in tests."""
with patch("roborock.devices.v1_channel.create_map_response_decoder") as mock_create_decoder:
yield mock_create_decoder
@pytest.fixture(name="cache")
def cache_fixtures() -> InMemoryCache:
"""Mock cache for testing."""
return InMemoryCache()
@pytest.fixture(name="v1_channel")
def setup_v1_channel(
mock_mqtt_channel: Mock,
mock_local_session: Mock,
mock_create_map_response_decoder: Mock,
cache: InMemoryCache,
) -> V1Channel:
"""Fixture to set up the V1 channel for tests."""
return V1Channel(
device_uid=TEST_DEVICE_UID,
security_data=TEST_SECURITY_DATA,
mqtt_channel=mock_mqtt_channel,
local_session=mock_local_session,
cache=cache,
)
@pytest.fixture(name="warning_caplog")
def setup_warning_caplog(caplog: pytest.LogCaptureFixture) -> pytest.LogCaptureFixture:
"""Fixture to capture warning messages."""
caplog.set_level(logging.WARNING)
return caplog
async def test_v1_channel_subscribe_mqtt_only_success(
v1_channel: V1Channel,
mock_mqtt_channel: FakeChannel,
mock_local_session: Mock,
mock_local_channel: FakeChannel,
) -> None:
"""Test successful subscription with MQTT only (local connection fails)."""
# Setup: MQTT succeeds, local fails
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
mock_local_channel.connect.side_effect = RoborockException("Connection failed")
callback = Mock()
unsub = await v1_channel.subscribe(callback)
# Verify MQTT connection was established
assert mock_mqtt_channel.subscribers
# Verify local connection was attempted but failed
mock_local_session.assert_called_once_with(TEST_HOST)
mock_local_channel.connect.assert_called_once()
# Verify properties
assert v1_channel.is_mqtt_connected
assert not v1_channel.is_local_connected
# Test unsubscribe
unsub()
assert not mock_mqtt_channel.subscribers
async def test_v1_channel_mqtt_disconnected(
v1_channel: V1Channel,
mock_mqtt_channel: FakeChannel,
mock_local_session: Mock,
mock_local_channel: FakeChannel,
) -> None:
"""Test successful subscription with MQTT only (local connection fails)."""
# Setup: MQTT succeeds, local fails
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
mock_local_channel.connect.side_effect = RoborockException("Connection failed")
callback = Mock()
unsub = await v1_channel.subscribe(callback)
# Verify MQTT connection was established
assert mock_mqtt_channel.subscribers
# Verify local connection was attempted but failed
mock_local_session.assert_called_once_with(TEST_HOST)
mock_local_channel.connect.assert_called_once()
# Simulate an MQTT disconnection where the channel is not healthy
mock_mqtt_channel.close()
# Verify properties
assert not v1_channel.is_mqtt_connected
assert not v1_channel.is_local_connected
# Test unsubscribe
unsub()
assert not mock_mqtt_channel.subscribers
async def test_v1_channel_subscribe_local_success(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
mock_local_session: Mock,
) -> None:
"""Test successful subscription with local connections."""
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
# Mock network info retrieval
callback = Mock()
unsub = await v1_channel.subscribe(callback)
# Verify local connection was attempted and succeeded
mock_local_session.assert_called_once_with(TEST_HOST)
mock_local_channel.connect.assert_called_once()
# Verify local connection established and not mqtt
assert not mock_mqtt_channel.subscribers
assert mock_local_channel.subscribers
# Verify properties
assert not v1_channel.is_mqtt_connected
assert v1_channel.is_local_connected
# Test unsubscribe cleans up both
unsub()
assert not mock_mqtt_channel.subscribers
assert not mock_local_channel.subscribers
async def test_v1_channel_subscribe_already_connected_error(v1_channel: V1Channel, mock_mqtt_channel: Mock) -> None:
"""Test error when trying to subscribe when already connected."""
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
# First subscription succeeds
await v1_channel.subscribe(Mock())
# Second subscription should fail
with pytest.raises(ValueError, match="Only one subscription allowed at a time"):
await v1_channel.subscribe(Mock())
async def test_v1_channel_local_connection_warning_logged(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
warning_caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that local connection failures are logged as warnings."""
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
mock_local_channel.connect.side_effect = RoborockException("Local connection failed")
await v1_channel.subscribe(Mock())
assert "Could not establish local connection for device abc123" in warning_caplog.text
assert "Local connection failed" in warning_caplog.text
async def test_v1_channel_send_command_local_preferred(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
) -> None:
"""Test command sending prefers local connection when available."""
# Establish connections
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
await v1_channel.subscribe(Mock())
# Send command
mock_local_channel.response_queue.append(TEST_RESPONSE)
result = await v1_channel.rpc_channel.send_command(
RoborockCommand.GET_STATUS,
response_type=S5MaxStatus,
)
# Verify local response was parsed
assert result.state == RoborockStateCode.cleaning
async def test_v1_channel_send_command_local_fails(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
) -> None:
"""Test case where sending with local connection fails, falling back to MQTT."""
# Establish connections
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
await v1_channel.subscribe(Mock())
# Local command fails
mock_local_channel.publish = Mock()
mock_local_channel.publish.side_effect = RoborockException("Local failed")
# MQTT command succeeds
mock_mqtt_channel.response_queue.append(TEST_RESPONSE)
# Send command
result = await v1_channel.rpc_channel.send_command(
RoborockCommand.GET_STATUS,
response_type=S5MaxStatus,
)
# Verify result
assert result.state == RoborockStateCode.cleaning
# Verify local was attempted
mock_local_channel.publish.assert_called_once()
# Verify MQTT was used
assert mock_mqtt_channel.published_messages
# The last message should be the command we sent
assert mock_mqtt_channel.published_messages[-1].protocol == RoborockMessageProtocol.RPC_REQUEST
async def test_v1_channel_send_decoded_command_mqtt_only(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
) -> None:
"""Test command sending works with MQTT only."""
# Setup: only MQTT connection
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
mock_local_channel.connect.side_effect = RoborockException("No local")
await v1_channel.subscribe(Mock())
# Send command
mock_mqtt_channel.response_queue.append(TEST_RESPONSE)
result = await v1_channel.rpc_channel.send_command(
RoborockCommand.GET_STATUS,
response_type=S5MaxStatus,
)
# Verify only MQTT was used
assert result.state == RoborockStateCode.cleaning
async def test_v1_channel_send_decoded_command_with_params(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
) -> None:
"""Test command sending with parameters."""
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
await v1_channel.subscribe(Mock())
# Send command with params
mock_local_channel.response_queue.append(TEST_RESPONSE)
test_params = {"volume": 80}
await v1_channel.rpc_channel.send_command(
RoborockCommand.CHANGE_SOUND_VOLUME,
response_type=S5MaxStatus,
params=test_params,
)
# Verify command was sent with correct params
assert mock_local_channel.published_messages
sent_message = mock_local_channel.published_messages[0]
assert sent_message
assert isinstance(sent_message, RoborockMessage)
assert sent_message.payload
payload = sent_message.payload.decode()
json_data = json.loads(payload)
assert "dps" in json_data
assert "101" in json_data["dps"]
decoded_payload = json.loads(json_data["dps"]["101"])
assert decoded_payload["method"] == "change_sound_volume"
assert decoded_payload["params"] == {"volume": 80}
async def test_v1_channel_networking_info_retrieved_during_connection(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
mock_local_session: Mock,
) -> None:
"""Test that networking information is retrieved during local connection setup."""
# Setup: MQTT returns network info when requested
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
# Subscribe - this should trigger network info retrieval for local connection
await v1_channel.subscribe(Mock())
# Verify local connection was esablished
assert v1_channel.is_local_connected
# Verify network info was requested via MQTT
assert mock_mqtt_channel.published_messages
# Verify local session was created with the correct IP
mock_local_session.assert_called_once_with(mock_data.NETWORK_INFO["ip"])
async def test_v1_channel_networking_info_cached_during_connection(
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
mock_local_session: Mock,
) -> None:
"""Test that networking information is cached and reused on subsequent connections."""
# Create a cache with pre-populated network info
cache_data = CacheData()
cache_data.network_info[TEST_DEVICE_UID] = TEST_NETWORKING_INFO
mock_cache = AsyncMock()
mock_cache.get.return_value = cache_data
mock_cache.set = AsyncMock()
# Create V1Channel with the mock cache
v1_channel = V1Channel(
device_uid=TEST_DEVICE_UID,
security_data=TEST_SECURITY_DATA,
mqtt_channel=mock_mqtt_channel,
local_session=mock_local_session,
cache=mock_cache,
)
# Subscribe - should use cached network info
await v1_channel.subscribe(Mock())
# Verify local connections are established
assert v1_channel.is_local_connected
# Verify network info was NOT requested via MQTT (cache hit)
assert not mock_mqtt_channel.published_messages
assert not mock_local_channel.published_messages
# Verify local session was created with the correct IP from cache
mock_local_session.assert_called_once_with(mock_data.NETWORK_INFO["ip"])
# Verify cache was accessed but not updated (cache hit)
mock_cache.get.assert_called_once()
mock_cache.set.assert_not_called()
# V1Channel edge cases tests
async def test_v1_channel_local_connect_network_info_failure(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
) -> None:
"""Test local connection when network info retrieval fails."""
mock_mqtt_channel.publish_side_effect = RoborockException("Network info failed")
with pytest.raises(RoborockException):
await v1_channel._local_connect()
async def test_v1_channel_local_connect_network_info_failure_fallback_to_cache(
mock_mqtt_channel: FakeChannel,
mock_local_session: Mock,
v1_channel: V1Channel,
cache: InMemoryCache,
) -> None:
"""Test local connection falls back to cache when network info retrieval fails."""
# Create a cache with pre-populated network info
cache_data = CacheData()
cache_data.network_info[TEST_DEVICE_UID] = TEST_NETWORKING_INFO
await cache.set(cache_data)
# Setup: MQTT fails to publish
mock_mqtt_channel.publish_side_effect = RoborockException("Network info failed")
# Attempt local connect, forcing a refresh (prefer_cache=False)
# This should try MQTT, fail, and then fall back to cache
await v1_channel._local_connect(prefer_cache=False)
# Verify local connection was established
assert v1_channel.is_local_connected
# Verify MQTT was attempted (published message)
assert mock_mqtt_channel.published_messages
# Verify local session was created with the correct IP from cache
mock_local_session.assert_called_once_with(TEST_HOST)
async def test_v1_channel_command_encoding_validation(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_local_channel: Mock,
) -> None:
"""Test that command encoding works for different protocols."""
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
await v1_channel.subscribe(Mock())
# Send mqtt command and capture the request
mock_mqtt_channel.response_queue.append(TEST_RESPONSE)
await v1_channel.mqtt_rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50})
assert mock_mqtt_channel.published_messages
mqtt_message = mock_mqtt_channel.published_messages[0]
# Send local command and capture the request
mock_local_channel.response_queue.append(TEST_RESPONSE_2)
await v1_channel.rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50})
assert mock_local_channel.published_messages
local_message = mock_local_channel.published_messages[0]
# Verify both are RoborockMessage instances
assert isinstance(mqtt_message, RoborockMessage)
assert isinstance(local_message, RoborockMessage)
# But they should have different protocols
assert mqtt_message.protocol == RoborockMessageProtocol.RPC_REQUEST
assert local_message.protocol == RoborockMessageProtocol.GENERAL_REQUEST
async def test_v1_channel_send_map_command(
v1_channel: V1Channel,
mock_mqtt_channel: Mock,
mock_create_map_response_decoder: Mock,
) -> None:
"""Test that the map channel can correctly decode a map response."""
# Establish connections
mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
await v1_channel.subscribe(Mock())
# Prepare a mock map response
decompressed_map_data = b"this is the decompressed map data"
request_id = 12346 # from the mock_request_id fixture
# Mock the decoder to return a known response
map_response = MapResponse(request_id=request_id, data=decompressed_map_data)
mock_create_map_response_decoder.return_value.return_value = map_response
# The actual message content doesn't matter as much since the decoder is mocked
map_response_message = RoborockMessage(
protocol=RoborockMessageProtocol.MAP_RESPONSE,
payload=b"dummy_payload",
)
mock_mqtt_channel.response_queue.append(map_response_message)
# Send the command and get the result
result = await v1_channel.map_rpc_channel.send_command(RoborockCommand.GET_MAP_V1)
# Verify the result is the data from our mocked decoder
assert result == decompressed_map_data
|