File: test_v1_channel.py

package info (click to toggle)
python-roborock 3.8.5-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,912 kB
  • sloc: python: 14,982; makefile: 17
file content (548 lines) | stat: -rw-r--r-- 19,000 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
"""Tests for the V1Channel class.

This test simulates communication across both the MQTT and local connections
and failure modes, ensuring the V1Channel behaves correctly in various scenarios.
"""

import json
import logging
from collections.abc import Iterator
from unittest.mock import AsyncMock, Mock, patch

import pytest

from roborock.data import NetworkInfo, RoborockStateCode, S5MaxStatus, UserData
from roborock.devices.cache import CacheData, InMemoryCache
from roborock.devices.local_channel import LocalSession
from roborock.devices.v1_channel import V1Channel
from roborock.exceptions import RoborockException
from roborock.protocol import (
    create_local_decoder,
    create_local_encoder,
    create_mqtt_decoder,
    create_mqtt_encoder,
)
from roborock.protocols.v1_protocol import MapResponse, SecurityData
from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol
from roborock.roborock_typing import RoborockCommand

from .. import mock_data
from ..conftest import FakeChannel

USER_DATA = UserData.from_dict(mock_data.USER_DATA)
TEST_DEVICE_UID = "abc123"
TEST_LOCAL_KEY = "local_key"
TEST_SECURITY_DATA = SecurityData(endpoint="test_endpoint", nonce=b"test_nonce_16byte")
TEST_HOST = mock_data.TEST_LOCAL_API_HOST


# Test messages for V1 protocol
TEST_REQUEST = RoborockMessage(
    protocol=RoborockMessageProtocol.RPC_REQUEST,
    payload=json.dumps({"dps": {"101": json.dumps({"id": 12346, "method": "get_status"})}}).encode(),
)
TEST_RESPONSE = RoborockMessage(
    protocol=RoborockMessageProtocol.RPC_RESPONSE,
    payload=json.dumps(
        {"dps": {"102": json.dumps({"id": 12346, "result": {"state": RoborockStateCode.cleaning}})}}
    ).encode(),
)
TEST_RESPONSE_2 = RoborockMessage(
    protocol=RoborockMessageProtocol.RPC_RESPONSE,
    payload=json.dumps(
        {"dps": {"102": json.dumps({"id": 12347, "result": {"state": RoborockStateCode.cleaning}})}}
    ).encode(),
)
TEST_NETWORK_INFO_RESPONSE = RoborockMessage(
    protocol=RoborockMessageProtocol.RPC_RESPONSE,
    payload=json.dumps({"dps": {"102": json.dumps({"id": 12345, "result": mock_data.NETWORK_INFO})}}).encode(),
)

TEST_NETWORKING_INFO = NetworkInfo.from_dict(mock_data.NETWORK_INFO)

# Encoders/Decoders
MQTT_ENCODER = create_mqtt_encoder(TEST_LOCAL_KEY)
MQTT_DECODER = create_mqtt_decoder(TEST_LOCAL_KEY)
LOCAL_ENCODER = create_local_encoder(TEST_LOCAL_KEY)
LOCAL_DECODER = create_local_decoder(TEST_LOCAL_KEY)


@pytest.fixture(name="mock_mqtt_channel")
async def setup_mock_mqtt_channel() -> FakeChannel:
    """Mock MQTT channel for testing."""
    channel = FakeChannel()
    await channel.connect()
    return channel


@pytest.fixture(name="mock_local_channel")
async def setup_mock_local_channel() -> FakeChannel:
    """Mock Local channel for testing."""
    return FakeChannel()


@pytest.fixture(name="mock_local_session")
def setup_mock_local_session(mock_local_channel: Mock) -> Mock:
    """Mock Local session factory for testing."""
    mock_session = Mock(spec=LocalSession)
    mock_session.return_value = mock_local_channel
    return mock_session


@pytest.fixture(name="mock_request_id", autouse=True)
def setup_mock_request_id() -> Iterator[None]:
    """Assign sequential request ids for testing."""

    next_id = 12345

    def fake_next_int(*args) -> int:
        nonlocal next_id
        id_to_return = next_id
        next_id += 1
        return id_to_return

    with patch("roborock.protocols.v1_protocol.get_next_int", side_effect=fake_next_int):
        yield


@pytest.fixture(name="mock_create_map_response_decoder")
def setup_mock_map_decoder() -> Iterator[Mock]:
    """Mock the map response decoder to control its behavior in tests."""
    with patch("roborock.devices.v1_channel.create_map_response_decoder") as mock_create_decoder:
        yield mock_create_decoder


@pytest.fixture(name="cache")
def cache_fixtures() -> InMemoryCache:
    """Mock cache for testing."""
    return InMemoryCache()


@pytest.fixture(name="v1_channel")
def setup_v1_channel(
    mock_mqtt_channel: Mock,
    mock_local_session: Mock,
    mock_create_map_response_decoder: Mock,
    cache: InMemoryCache,
) -> V1Channel:
    """Fixture to set up the V1 channel for tests."""
    return V1Channel(
        device_uid=TEST_DEVICE_UID,
        security_data=TEST_SECURITY_DATA,
        mqtt_channel=mock_mqtt_channel,
        local_session=mock_local_session,
        cache=cache,
    )


@pytest.fixture(name="warning_caplog")
def setup_warning_caplog(caplog: pytest.LogCaptureFixture) -> pytest.LogCaptureFixture:
    """Fixture to capture warning messages."""
    caplog.set_level(logging.WARNING)
    return caplog


async def test_v1_channel_subscribe_mqtt_only_success(
    v1_channel: V1Channel,
    mock_mqtt_channel: FakeChannel,
    mock_local_session: Mock,
    mock_local_channel: FakeChannel,
) -> None:
    """Test successful subscription with MQTT only (local connection fails)."""
    # Setup: MQTT succeeds, local fails
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    mock_local_channel.connect.side_effect = RoborockException("Connection failed")

    callback = Mock()
    unsub = await v1_channel.subscribe(callback)

    # Verify MQTT connection was established
    assert mock_mqtt_channel.subscribers

    # Verify local connection was attempted but failed
    mock_local_session.assert_called_once_with(TEST_HOST)
    mock_local_channel.connect.assert_called_once()

    # Verify properties
    assert v1_channel.is_mqtt_connected
    assert not v1_channel.is_local_connected

    # Test unsubscribe
    unsub()
    assert not mock_mqtt_channel.subscribers


async def test_v1_channel_mqtt_disconnected(
    v1_channel: V1Channel,
    mock_mqtt_channel: FakeChannel,
    mock_local_session: Mock,
    mock_local_channel: FakeChannel,
) -> None:
    """Test successful subscription with MQTT only (local connection fails)."""
    # Setup: MQTT succeeds, local fails
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    mock_local_channel.connect.side_effect = RoborockException("Connection failed")

    callback = Mock()
    unsub = await v1_channel.subscribe(callback)

    # Verify MQTT connection was established
    assert mock_mqtt_channel.subscribers

    # Verify local connection was attempted but failed
    mock_local_session.assert_called_once_with(TEST_HOST)
    mock_local_channel.connect.assert_called_once()

    # Simulate an MQTT disconnection where the channel is not healthy
    mock_mqtt_channel.close()

    # Verify properties
    assert not v1_channel.is_mqtt_connected
    assert not v1_channel.is_local_connected

    # Test unsubscribe
    unsub()
    assert not mock_mqtt_channel.subscribers


async def test_v1_channel_subscribe_local_success(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
    mock_local_session: Mock,
) -> None:
    """Test successful subscription with local connections."""
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)

    # Mock network info retrieval
    callback = Mock()
    unsub = await v1_channel.subscribe(callback)

    # Verify local connection was attempted and succeeded
    mock_local_session.assert_called_once_with(TEST_HOST)
    mock_local_channel.connect.assert_called_once()

    # Verify local connection established and not mqtt
    assert not mock_mqtt_channel.subscribers
    assert mock_local_channel.subscribers

    # Verify properties
    assert not v1_channel.is_mqtt_connected
    assert v1_channel.is_local_connected

    # Test unsubscribe cleans up both
    unsub()
    assert not mock_mqtt_channel.subscribers
    assert not mock_local_channel.subscribers


async def test_v1_channel_subscribe_already_connected_error(v1_channel: V1Channel, mock_mqtt_channel: Mock) -> None:
    """Test error when trying to subscribe when already connected."""
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)

    # First subscription succeeds
    await v1_channel.subscribe(Mock())

    # Second subscription should fail
    with pytest.raises(ValueError, match="Only one subscription allowed at a time"):
        await v1_channel.subscribe(Mock())


async def test_v1_channel_local_connection_warning_logged(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
    warning_caplog: pytest.LogCaptureFixture,
) -> None:
    """Test that local connection failures are logged as warnings."""
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    mock_local_channel.connect.side_effect = RoborockException("Local connection failed")

    await v1_channel.subscribe(Mock())

    assert "Could not establish local connection for device abc123" in warning_caplog.text
    assert "Local connection failed" in warning_caplog.text


async def test_v1_channel_send_command_local_preferred(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
) -> None:
    """Test command sending prefers local connection when available."""
    # Establish connections
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    await v1_channel.subscribe(Mock())

    # Send command
    mock_local_channel.response_queue.append(TEST_RESPONSE)
    result = await v1_channel.rpc_channel.send_command(
        RoborockCommand.GET_STATUS,
        response_type=S5MaxStatus,
    )

    # Verify local response was parsed
    assert result.state == RoborockStateCode.cleaning


async def test_v1_channel_send_command_local_fails(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
) -> None:
    """Test case where sending with local connection fails, falling back to MQTT."""

    # Establish connections
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    await v1_channel.subscribe(Mock())

    # Local command fails
    mock_local_channel.publish = Mock()
    mock_local_channel.publish.side_effect = RoborockException("Local failed")

    # MQTT command succeeds
    mock_mqtt_channel.response_queue.append(TEST_RESPONSE)

    # Send command
    result = await v1_channel.rpc_channel.send_command(
        RoborockCommand.GET_STATUS,
        response_type=S5MaxStatus,
    )

    # Verify result
    assert result.state == RoborockStateCode.cleaning

    # Verify local was attempted
    mock_local_channel.publish.assert_called_once()

    # Verify MQTT was used
    assert mock_mqtt_channel.published_messages
    # The last message should be the command we sent
    assert mock_mqtt_channel.published_messages[-1].protocol == RoborockMessageProtocol.RPC_REQUEST


async def test_v1_channel_send_decoded_command_mqtt_only(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
) -> None:
    """Test command sending works with MQTT only."""
    # Setup: only MQTT connection
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    mock_local_channel.connect.side_effect = RoborockException("No local")

    await v1_channel.subscribe(Mock())

    # Send command
    mock_mqtt_channel.response_queue.append(TEST_RESPONSE)
    result = await v1_channel.rpc_channel.send_command(
        RoborockCommand.GET_STATUS,
        response_type=S5MaxStatus,
    )

    # Verify only MQTT was used
    assert result.state == RoborockStateCode.cleaning


async def test_v1_channel_send_decoded_command_with_params(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
) -> None:
    """Test command sending with parameters."""

    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    await v1_channel.subscribe(Mock())

    # Send command with params
    mock_local_channel.response_queue.append(TEST_RESPONSE)
    test_params = {"volume": 80}
    await v1_channel.rpc_channel.send_command(
        RoborockCommand.CHANGE_SOUND_VOLUME,
        response_type=S5MaxStatus,
        params=test_params,
    )

    # Verify command was sent with correct params
    assert mock_local_channel.published_messages
    sent_message = mock_local_channel.published_messages[0]
    assert sent_message
    assert isinstance(sent_message, RoborockMessage)
    assert sent_message.payload
    payload = sent_message.payload.decode()
    json_data = json.loads(payload)
    assert "dps" in json_data
    assert "101" in json_data["dps"]
    decoded_payload = json.loads(json_data["dps"]["101"])
    assert decoded_payload["method"] == "change_sound_volume"
    assert decoded_payload["params"] == {"volume": 80}


async def test_v1_channel_networking_info_retrieved_during_connection(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
    mock_local_session: Mock,
) -> None:
    """Test that networking information is retrieved during local connection setup."""
    # Setup: MQTT returns network info when requested
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)

    # Subscribe - this should trigger network info retrieval for local connection
    await v1_channel.subscribe(Mock())

    # Verify local connection was esablished
    assert v1_channel.is_local_connected

    # Verify network info was requested via MQTT
    assert mock_mqtt_channel.published_messages

    # Verify local session was created with the correct IP
    mock_local_session.assert_called_once_with(mock_data.NETWORK_INFO["ip"])


async def test_v1_channel_networking_info_cached_during_connection(
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
    mock_local_session: Mock,
) -> None:
    """Test that networking information is cached and reused on subsequent connections."""

    # Create a cache with pre-populated network info
    cache_data = CacheData()
    cache_data.network_info[TEST_DEVICE_UID] = TEST_NETWORKING_INFO

    mock_cache = AsyncMock()
    mock_cache.get.return_value = cache_data
    mock_cache.set = AsyncMock()

    # Create V1Channel with the mock cache
    v1_channel = V1Channel(
        device_uid=TEST_DEVICE_UID,
        security_data=TEST_SECURITY_DATA,
        mqtt_channel=mock_mqtt_channel,
        local_session=mock_local_session,
        cache=mock_cache,
    )

    # Subscribe - should use cached network info
    await v1_channel.subscribe(Mock())

    # Verify local connections are established
    assert v1_channel.is_local_connected

    # Verify network info was NOT requested via MQTT (cache hit)
    assert not mock_mqtt_channel.published_messages
    assert not mock_local_channel.published_messages

    # Verify local session was created with the correct IP from cache
    mock_local_session.assert_called_once_with(mock_data.NETWORK_INFO["ip"])

    # Verify cache was accessed but not updated (cache hit)
    mock_cache.get.assert_called_once()
    mock_cache.set.assert_not_called()


# V1Channel edge cases tests


async def test_v1_channel_local_connect_network_info_failure(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
) -> None:
    """Test local connection when network info retrieval fails."""
    mock_mqtt_channel.publish_side_effect = RoborockException("Network info failed")

    with pytest.raises(RoborockException):
        await v1_channel._local_connect()


async def test_v1_channel_local_connect_network_info_failure_fallback_to_cache(
    mock_mqtt_channel: FakeChannel,
    mock_local_session: Mock,
    v1_channel: V1Channel,
    cache: InMemoryCache,
) -> None:
    """Test local connection falls back to cache when network info retrieval fails."""
    # Create a cache with pre-populated network info
    cache_data = CacheData()
    cache_data.network_info[TEST_DEVICE_UID] = TEST_NETWORKING_INFO
    await cache.set(cache_data)

    # Setup: MQTT fails to publish
    mock_mqtt_channel.publish_side_effect = RoborockException("Network info failed")

    # Attempt local connect, forcing a refresh (prefer_cache=False)
    # This should try MQTT, fail, and then fall back to cache
    await v1_channel._local_connect(prefer_cache=False)

    # Verify local connection was established
    assert v1_channel.is_local_connected

    # Verify MQTT was attempted (published message)
    assert mock_mqtt_channel.published_messages

    # Verify local session was created with the correct IP from cache
    mock_local_session.assert_called_once_with(TEST_HOST)


async def test_v1_channel_command_encoding_validation(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_local_channel: Mock,
) -> None:
    """Test that command encoding works for different protocols."""
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    await v1_channel.subscribe(Mock())

    # Send mqtt command and capture the request
    mock_mqtt_channel.response_queue.append(TEST_RESPONSE)
    await v1_channel.mqtt_rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50})
    assert mock_mqtt_channel.published_messages
    mqtt_message = mock_mqtt_channel.published_messages[0]

    # Send local command and capture the request
    mock_local_channel.response_queue.append(TEST_RESPONSE_2)
    await v1_channel.rpc_channel.send_command(RoborockCommand.CHANGE_SOUND_VOLUME, params={"volume": 50})
    assert mock_local_channel.published_messages
    local_message = mock_local_channel.published_messages[0]

    # Verify both are RoborockMessage instances
    assert isinstance(mqtt_message, RoborockMessage)
    assert isinstance(local_message, RoborockMessage)

    # But they should have different protocols
    assert mqtt_message.protocol == RoborockMessageProtocol.RPC_REQUEST
    assert local_message.protocol == RoborockMessageProtocol.GENERAL_REQUEST


async def test_v1_channel_send_map_command(
    v1_channel: V1Channel,
    mock_mqtt_channel: Mock,
    mock_create_map_response_decoder: Mock,
) -> None:
    """Test that the map channel can correctly decode a map response."""
    # Establish connections
    mock_mqtt_channel.response_queue.append(TEST_NETWORK_INFO_RESPONSE)
    await v1_channel.subscribe(Mock())

    # Prepare a mock map response
    decompressed_map_data = b"this is the decompressed map data"
    request_id = 12346  # from the mock_request_id fixture

    # Mock the decoder to return a known response
    map_response = MapResponse(request_id=request_id, data=decompressed_map_data)
    mock_create_map_response_decoder.return_value.return_value = map_response

    # The actual message content doesn't matter as much since the decoder is mocked
    map_response_message = RoborockMessage(
        protocol=RoborockMessageProtocol.MAP_RESPONSE,
        payload=b"dummy_payload",
    )
    mock_mqtt_channel.response_queue.append(map_response_message)

    # Send the command and get the result
    result = await v1_channel.map_rpc_channel.send_command(RoborockCommand.GET_MAP_V1)

    # Verify the result is the data from our mocked decoder
    assert result == decompressed_map_data