File: test_listeners.py

package info (click to toggle)
zigpy 0.80.1-2
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 3,012 kB
  • sloc: python: 34,822; sql: 2,109; makefile: 7
file content (194 lines) | stat: -rw-r--r-- 5,809 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import asyncio
import logging
from unittest import mock

import pytest

from zigpy import listeners
from zigpy.zcl import foundation
import zigpy.zcl.clusters.general
import zigpy.zdo.types as zdo_t


def make_hdr(cmd, **kwargs):
    return foundation.ZCLHeader.cluster(tsn=0x12, command_id=cmd.command.id, **kwargs)


query_next_image = zigpy.zcl.clusters.general.Ota.commands_by_name[
    "query_next_image"
].schema
on = zigpy.zcl.clusters.general.OnOff.commands_by_name["on"].schema
off = zigpy.zcl.clusters.general.OnOff.commands_by_name["off"].schema
toggle = zigpy.zcl.clusters.general.OnOff.commands_by_name["toggle"].schema


async def test_future_listener():
    listener = listeners.FutureListener(
        matchers=[
            query_next_image(manufacturer_code=0x1234),
            on(),
            lambda hdr, cmd: hdr.command_id == 0x02,
        ],
        future=asyncio.get_running_loop().create_future(),
    )

    assert not listener.resolve(make_hdr(off()), off())
    assert not listener.resolve(
        make_hdr(query_next_image()),
        query_next_image(
            field_control=0,
            manufacturer_code=0x5678,  # wrong `manufacturer_code`
            image_type=0x0000,
            current_file_version=0x00000000,
        ),
    )

    # Only `on()` matches
    assert listener.resolve(make_hdr(on()), on())
    assert listener.future.result() == (make_hdr(on()), on())

    # Subsequent matches will not work
    assert not listener.resolve(make_hdr(on()), on())

    # Reset the future
    object.__setattr__(listener, "future", asyncio.get_running_loop().create_future())
    valid_query = query_next_image(
        field_control=0,
        manufacturer_code=0x1234,  # correct `manufacturer_code`
        image_type=0x0000,
        current_file_version=0x00000000,
    )
    assert listener.resolve(make_hdr(valid_query), valid_query)
    assert listener.future.result() == (make_hdr(valid_query), valid_query)

    # Reset the future
    object.__setattr__(listener, "future", asyncio.get_running_loop().create_future())

    # Function matcher works
    assert listener.resolve(make_hdr(toggle()), toggle())
    assert listener.future.result() == (make_hdr(toggle()), toggle())


async def test_future_listener_cancellation():
    listener = listeners.FutureListener(
        matchers=[],
        future=asyncio.get_running_loop().create_future(),
    )

    assert listener.cancel()
    assert listener.cancel()
    assert listener.cancel()

    with pytest.raises(asyncio.CancelledError):
        await listener.future


async def test_callback_listener():
    listener = listeners.CallbackListener(
        matchers=[
            query_next_image(manufacturer_code=0x1234),
            on(),
        ],
        callback=mock.Mock(),
    )

    assert not listener.resolve(make_hdr(off()), off())
    assert not listener.resolve(
        make_hdr(query_next_image()),
        query_next_image(
            field_control=0,
            manufacturer_code=0x5678,  # wrong `manufacturer_code`
            image_type=0x0000,
            current_file_version=0x00000000,
        ),
    )

    # Only `on()` matches
    assert listener.resolve(make_hdr(on()), on())
    assert listener.callback.mock_calls == [mock.call(make_hdr(on()), on())]

    # Subsequent matches still work
    assert not listener.cancel()  # cancellation is not supported
    assert listener.resolve(make_hdr(on()), on())
    assert listener.callback.mock_calls == [
        mock.call(make_hdr(on()), on()),
        mock.call(make_hdr(on()), on()),
    ]


async def test_callback_listener_error(caplog):
    listener = listeners.CallbackListener(
        matchers=[
            on(),
        ],
        callback=mock.Mock(side_effect=RuntimeError("Uh oh")),
    )

    with caplog.at_level(logging.WARNING):
        assert listener.resolve(make_hdr(on()), on())

    assert "Caught an exception while executing callback" in caplog.text
    assert "RuntimeError: Uh oh" in caplog.text


async def test_listener_callback_matches():
    listener = listeners.CallbackListener(
        matchers=[lambda hdr, command: True],
        callback=mock.Mock(),
    )

    assert listener.resolve(make_hdr(off()), off())
    assert listener.callback.mock_calls == [mock.call(make_hdr(off()), off())]


async def test_listener_callback_no_matches():
    listener = listeners.CallbackListener(
        matchers=[lambda hdr, command: False],
        callback=mock.Mock(),
    )

    assert not listener.resolve(make_hdr(off()), off())
    assert listener.callback.mock_calls == []


async def test_listener_callback_invalid_matcher(caplog):
    listener = listeners.CallbackListener(
        matchers=[object()],
        callback=mock.Mock(),
    )

    with caplog.at_level(logging.WARNING):
        assert not listener.resolve(make_hdr(off()), off())

    assert listener.callback.mock_calls == []
    assert f"Matcher {listener.matchers[0]!r} and command" in caplog.text


async def test_listener_callback_invalid_call(caplog):
    listener = listeners.CallbackListener(
        matchers=[on()],
        callback=mock.Mock(),
    )

    with caplog.at_level(logging.WARNING):
        assert not listener.resolve(make_hdr(on()), b"data")

    assert listener.callback.mock_calls == []
    assert f"Matcher {listener.matchers[0]!r} and command" in caplog.text


async def test_listener_callback_zdo(caplog):
    listener = listeners.CallbackListener(
        matchers=[
            query_next_image(manufacturer_code=0x1234),
        ],
        callback=mock.Mock(),
    )

    zdo_hdr = zdo_t.ZDOHeader(command_id=zdo_t.ZDOCmd.NWK_addr_req, tsn=0x01)
    zdo_cmd = [0x0000]

    with caplog.at_level(logging.WARNING):
        assert not listener.resolve(zdo_hdr, zdo_cmd)

    assert caplog.text == ""