1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
from unittest.mock import MagicMock, call
import pytest
import zigpy.state
import bellows.ezsp.v5
import bellows.types as t
from tests.common import mock_ezsp_commands
@pytest.fixture
def ezsp_f():
"""EZSP v5 protocol handler."""
ezsp = bellows.ezsp.v5.EZSPv5(MagicMock(), MagicMock())
mock_ezsp_commands(ezsp)
return ezsp
def test_ezsp_frame(ezsp_f):
ezsp_f._seq = 0x22
data = ezsp_f._ezsp_frame("version", 5)
assert data == b"\x22\x00\xff\x00\x00\x05"
def test_ezsp_frame_rx(ezsp_f):
"""Test receiving a version frame."""
ezsp_f(b"\x01\x00\xff\x00\x00\x01\x02\x34\x12")
assert ezsp_f._handle_callback.call_count == 1
assert ezsp_f._handle_callback.call_args[0][0] == "version"
assert ezsp_f._handle_callback.call_args[0][1] == [0x01, 0x02, 0x1234]
async def test_pre_permit(ezsp_f):
"""Test pre permit."""
ezsp_f.addTransientLinkKey.return_value = [t.EmberStatus.SUCCESS]
await ezsp_f.pre_permit(1)
assert ezsp_f.addTransientLinkKey.mock_calls == [
call(
partner=t.EUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"),
transientKey=t.KeyData(b"ZigBeeAlliance09"),
)
]
async def test_read_address_table(ezsp_f):
def get_addr_table_node_id(addressTableIndex):
return (
{
16: t.EmberNodeId(0x44CB),
17: t.EmberNodeId(0x0702),
18: t.EmberNodeId(0x0000), # bogus entry
}.get(addressTableIndex, t.EmberNodeId(0xFFFF)),
)
ezsp_f.getAddressTableRemoteNodeId.side_effect = get_addr_table_node_id
def get_addr_table_eui64(addressTableIndex):
if addressTableIndex < 16:
return (t.EUI64.convert("ff:ff:ff:ff:ff:ff:ff:ff"),)
elif 16 <= addressTableIndex <= 18:
return (
{
16: t.EUI64.convert("cc:cc:cc:ff:fe:e6:8e:ca"),
17: t.EUI64.convert("ec:1b:bd:ff:fe:2f:41:a4"),
18: t.EUI64.convert("00:00:00:00:00:00:00:00"),
}[addressTableIndex],
)
else:
return (t.EUI64.convert("00:00:00:00:00:00:00:00"),)
ezsp_f.getConfigurationValue.return_value = (t.EmberStatus.SUCCESS, 20)
ezsp_f.getAddressTableRemoteEui64.side_effect = get_addr_table_eui64
address_table = [key async for key in ezsp_f.read_address_table()]
assert address_table == [
(0x44CB, t.EUI64.convert("cc:cc:cc:ff:fe:e6:8e:ca")),
(0x0702, t.EUI64.convert("ec:1b:bd:ff:fe:2f:41:a4")),
]
async def test_write_nwk_frame_counter(ezsp_f) -> None:
ezsp_f.networkState.return_value = (t.EmberNetworkStatus.NO_NETWORK,)
ezsp_f.setValue.return_value = (t.EmberStatus.SUCCESS,)
await ezsp_f.write_nwk_frame_counter(12345678)
assert ezsp_f.setValue.mock_calls == [
call(
valueId=t.EzspValueId.VALUE_NWK_FRAME_COUNTER,
value=t.uint32_t(12345678).serialize(),
),
]
async def test_write_aps_frame_counter(ezsp_f) -> None:
ezsp_f.networkState.return_value = (t.EmberNetworkStatus.NO_NETWORK,)
ezsp_f.setValue.return_value = (t.EmberStatus.SUCCESS,)
await ezsp_f.write_aps_frame_counter(12345678)
assert ezsp_f.setValue.mock_calls == [
call(
valueId=t.EzspValueId.VALUE_APS_FRAME_COUNTER,
value=t.uint32_t(12345678).serialize(),
),
]
|