1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
|
import asyncio
import pytest
from zigpy import zdo
import zigpy.device
import zigpy.types as t
import zigpy.zdo.types as zdo_types
from .async_mock import AsyncMock, MagicMock, patch, sentinel
def test_commands():
for cmdid, cmdspec in zdo.types.CLUSTERS.items():
assert 0 <= cmdid <= 0xFFFF
assert isinstance(cmdspec, tuple)
for paramname, paramtype in zip(cmdspec[0], cmdspec[1], strict=True):
assert isinstance(paramname, str)
assert hasattr(paramtype, "serialize")
assert hasattr(paramtype, "deserialize")
@pytest.fixture
def zdo_f(app):
ieee = t.EUI64(map(t.uint8_t, [0, 1, 2, 3, 4, 5, 6, 7]))
dev = zigpy.device.Device(app, ieee, 65535)
dev.request = AsyncMock()
app.devices[dev.ieee] = dev
return zdo.ZDO(dev)
def test_deserialize(zdo_f):
hdr, args = zdo_f.deserialize(2, b"\x01\x02\x03xx")
assert hdr.tsn == 1
assert hdr.is_reply is False
assert args == [0x0302]
def test_deserialize_unknown(zdo_f):
with pytest.raises(ValueError):
hdr, args = zdo_f.deserialize(0x0100, b"\x01")
async def test_request(zdo_f):
with patch.object(
zdo_f._device, "get_sequence", wraps=zdo_f._device.get_sequence
) as get_sequence:
await zdo_f.request(2, 65535)
assert zdo_f.device.request.call_count == 1
assert zdo_f.device.request.mock_calls[0].kwargs["expect_reply"] is True
assert get_sequence.call_count == 1
async def test_bind(zdo_f):
cluster = MagicMock()
cluster.endpoint.endpoint_id = 1
cluster.cluster_id = 1026
await zdo_f.bind(cluster)
assert zdo_f.device.request.call_count == 1
assert zdo_f.device.request.mock_calls[0].kwargs["cluster"] == 0x0021
async def test_unbind(zdo_f):
cluster = MagicMock()
cluster.endpoint.endpoint_id = 1
cluster.cluster_id = 1026
await zdo_f.unbind(cluster)
assert zdo_f.device.request.call_count == 1
assert zdo_f.device.request.mock_calls[0].kwargs["cluster"] == 0x0022
@pytest.mark.parametrize(
("remove_children", "rejoin", "flags"),
[
(False, False, 0),
(False, True, 0x80),
(True, False, 0x40),
(True, True, 0xC0),
],
)
async def test_leave(zdo_f, remove_children, rejoin, flags):
"""Test ZDO leave request options."""
with patch.object(zdo_f, "request", AsyncMock()) as req_mock:
await zdo_f.leave(remove_children, rejoin)
assert req_mock.await_count == 1
assert req_mock.await_args[0][0] == 0x0034
assert req_mock.await_args[0][1] == t.EUI64.convert("07:06:05:04:03:02:01:00")
assert req_mock.await_args[0][2] == flags
async def test_permit(zdo_f):
await zdo_f.permit()
assert zdo_f.device.request.call_count == 1
assert zdo_f.device.request.mock_calls[0].kwargs["cluster"] == 0x0036
async def test_broadcast(app):
await zigpy.zdo.broadcast(app, 0x0036, 0, 0, 60, 0)
assert app.send_packet.call_count == 1
packet = app.send_packet.mock_calls[0].args[0]
assert packet.dst.addr_mode == t.AddrMode.Broadcast
assert packet.cluster_id == 0x0036
def _handle_match_desc(zdo_f, profile):
zdo_f.reply = AsyncMock()
hdr = MagicMock()
hdr.command_id = zdo_types.ZDOCmd.Match_Desc_req
zdo_f.handle_message(5, 0x0006, hdr, [None, profile, [], []])
assert zdo_f.reply.call_count == 1
async def test_handle_match_desc_zha(zdo_f):
_handle_match_desc(zdo_f, 260)
await asyncio.wait(asyncio.all_tasks(), return_when=asyncio.FIRST_COMPLETED)
assert zdo_f.reply.await_count == 1
assert zdo_f.reply.call_args[0][3]
async def test_handle_match_desc_generic(zdo_f):
_handle_match_desc(zdo_f, 0)
await asyncio.wait(asyncio.all_tasks(), return_when=asyncio.FIRST_COMPLETED)
assert zdo_f.reply.await_count == 1
assert not zdo_f.reply.call_args[0][3]
async def test_handle_nwk_addr(zdo_f):
ieee = zdo_f._device.application.state.node_info.ieee
zdo_f.reply = MagicMock()
hdr = MagicMock()
hdr.command_id = zdo_types.ZDOCmd.NWK_addr_req
zdo_f.handle_message(5, 0x0000, hdr, [ieee, 0x00])
assert zdo_f.reply.call_count == 1
async def test_handle_ieee_addr(zdo_f):
nwk = zdo_f._device.application.state.node_info.nwk
zdo_f.reply = MagicMock()
hdr = MagicMock()
hdr.command_id = zdo_types.ZDOCmd.IEEE_addr_req
zdo_f.handle_message(5, 0x0001, hdr, [nwk, 0x00])
assert zdo_f.reply.call_count == 1
def test_handle_announce(zdo_f):
dev = zdo_f._device
listener = MagicMock()
zdo_f.add_listener(listener)
dev._application.devices.pop(dev.ieee)
hdr = MagicMock()
hdr.command_id = zdo_types.ZDOCmd.Device_annce
zdo_f.handle_message(
5, 0x0013, hdr, [dev.nwk, dev.ieee, 0], dst_addressing=sentinel.dst_addr
)
assert listener.device_announce.call_count == 1
assert listener.device_announce.call_args[0][0] is dev
assert listener.zdo_device_annce.call_count == 1
assert listener.zdo_device_annce.call_args[0][0] is dev
assert listener.zdo_device_annce.call_args[0][1] is sentinel.dst_addr
assert listener.zdo_device_annce.call_args[0][2] is hdr
assert listener.zdo_device_annce.call_args[0][3] == [dev.nwk, dev.ieee, 0]
def test_handle_permit_join(zdo_f):
listener = MagicMock()
zdo_f.add_listener(listener)
hdr = MagicMock()
hdr.command_id = zdo_types.ZDOCmd.Mgmt_Permit_Joining_req
zdo_f.handle_message(5, 0x0036, hdr, [100, 1])
assert listener.permit_duration.call_count == 1
def test_handle_unsupported(zdo_f):
listener = MagicMock()
zdo_f.add_listener(listener)
hdr = MagicMock()
hdr.command_id = zdo_types.ZDOCmd(0xFFFF)
assert hdr.command_id not in list(zdo_types.ZDOCmd)
zdo_f.request = MagicMock()
zdo_f.reply = MagicMock()
zdo_f.handle_message(5, 0xFFFF, hdr, [])
assert listener.zdo_undefined_0xffff.call_count == 1
assert zdo_f.request.call_count == 0
assert zdo_f.reply.call_count == 0
def test_device_accessor(zdo_f):
assert zdo_f.device.nwk == 65535
async def test_reply(zdo_f):
zdo_f.device.request = AsyncMock()
await zdo_f.reply(0x0005)
assert zdo_f.device.request.call_count == 1
def test_get_attr_error(zdo_f):
with pytest.raises(AttributeError):
zdo_f.no_such_attribute()
async def test_reply_tsn_override(zdo_f, monkeypatch):
clusters = MagicMock()
clusters.__getitem__.return_value = (
sentinel.param_names,
sentinel.scheam,
)
monkeypatch.setattr(zdo_types, "CLUSTERS", clusters)
mock_ser = MagicMock()
mock_ser.return_value = b"\xaa\x55"
monkeypatch.setattr(t, "serialize", mock_ser)
await zdo_f.reply(sentinel.cmd, sentinel.arg1, sentinel.arg2)
seq = zdo_f.device.request.mock_calls[0].kwargs["sequence"]
data = zdo_f.device.request.mock_calls[0].kwargs["data"]
assert seq == 1
assert data[0] == 1
assert data[1:3] == b"\xaa\x55"
# override tsn
tsn = 0x23
await zdo_f.reply(sentinel.cmd, sentinel.arg1, sentinel.arg2, tsn=tsn)
seq = zdo_f.device.request.mock_calls[1].kwargs["sequence"]
data = zdo_f.device.request.mock_calls[1].kwargs["data"]
assert seq == tsn
assert data[0] == tsn
assert data[1:3] == b"\xaa\x55"
|