File: test_multicast.py

package info (click to toggle)
python-bellows 0.40.5-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 992 kB
  • sloc: python: 13,630; sh: 7; makefile: 4
file content (194 lines) | stat: -rw-r--r-- 6,175 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
from unittest.mock import AsyncMock, MagicMock, sentinel

import pytest
from zigpy.endpoint import Endpoint

import bellows.ezsp
import bellows.multicast
import bellows.types as t

from tests.common import mock_ezsp_commands

CUSTOM_SIZE = 12


@pytest.fixture
def ezsp_f():
    """EZSP v8 protocol handler."""
    ezsp = bellows.ezsp.v8.EZSPv8(MagicMock(), MagicMock())
    mock_ezsp_commands(ezsp)

    ezsp.getConfigurationValue.return_value = [t.EmberStatus.SUCCESS, CUSTOM_SIZE]

    return ezsp


@pytest.fixture
def multicast(ezsp_f):
    m = bellows.multicast.Multicast(ezsp_f)
    return m


async def test_initialize(multicast):
    group_id = 0x0200
    mct = active_multicasts = 4

    async def mock_get(*args):
        nonlocal group_id, mct
        entry = t.EmberMulticastTableEntry()
        if mct > 0:
            entry.endpoint = t.uint8_t(group_id % 3 + 1)
        else:
            entry.endpoint = t.uint8_t(0)
        entry.multicastId = t.EmberMulticastId(group_id)
        entry.networkIndex = t.uint8_t(0)
        group_id += 1
        mct -= 1
        return [t.EmberStatus.SUCCESS, entry]

    multicast._ezsp.getMulticastTableEntry.side_effect = mock_get
    await multicast._initialize()
    assert multicast._ezsp.getMulticastTableEntry.call_count == CUSTOM_SIZE
    assert len(multicast._available) == CUSTOM_SIZE - active_multicasts


async def test_initialize_fail_configured_size(multicast):
    multicast._ezsp.getConfigurationValue.return_value = t.EmberStatus.ERR_FATAL, 16
    await multicast._initialize()
    ezsp = multicast._ezsp
    assert ezsp.getMulticastTableEntry.call_count == 0
    assert len(multicast._available) == 0


async def test_initialize_fail(multicast):
    group_id = 0x0200

    async def mock_get(*args):
        nonlocal group_id
        entry = t.EmberMulticastTableEntry()
        entry.endpoint = t.uint8_t(group_id % 3 + 1)
        entry.multicastId = t.EmberMulticastId(group_id)
        entry.networkIndex = t.uint8_t(0)
        group_id += 1
        return [t.EmberStatus.ERR_FATAL, entry]

    multicast._ezsp.getMulticastTableEntry.side_effect = mock_get
    await multicast._initialize()
    ezsp = multicast._ezsp
    assert ezsp.getMulticastTableEntry.call_count == CUSTOM_SIZE
    assert len(multicast._available) == 0


async def test_startup(multicast):
    coordinator = MagicMock()
    ep1 = MagicMock(spec_set=Endpoint)
    ep1.member_of = [sentinel.grp, sentinel.grp, sentinel.grp]
    coordinator.endpoints = {0: sentinel.ZDO, 1: ep1}
    multicast._initialize = AsyncMock()
    multicast.subscribe = MagicMock()
    multicast.subscribe.side_effect = AsyncMock()
    await multicast.startup(coordinator)

    assert multicast._initialize.await_count == 1
    assert multicast.subscribe.call_count == len(ep1.member_of)
    assert multicast.subscribe.call_args[0][0] == sentinel.grp


def _subscribe(multicast, group_id, success=True):
    async def mock_set(*args):
        if success:
            return [t.EmberStatus.SUCCESS]
        return [t.EmberStatus.ERR_FATAL]

    multicast._ezsp.setMulticastTableEntry = MagicMock()
    multicast._ezsp.setMulticastTableEntry.side_effect = mock_set
    return multicast.subscribe(group_id)


async def test_subscribe(multicast):
    grp_id = 0x0200
    multicast._available.add(1)
    multicast._ezsp = ezsp_f

    ret = await _subscribe(multicast, grp_id, success=True)
    assert ret == t.EmberStatus.SUCCESS
    set_entry = multicast._ezsp.setMulticastTableEntry
    assert set_entry.call_count == 1
    assert set_entry.call_args[0][1].multicastId == grp_id
    assert grp_id in multicast._multicast

    set_entry.reset_mock()
    ret = await _subscribe(multicast, grp_id, success=True)
    assert ret == t.EmberStatus.SUCCESS
    set_entry = multicast._ezsp.setMulticastTableEntry
    assert set_entry.call_count == 0
    assert grp_id in multicast._multicast


async def test_subscribe_fail(multicast):
    grp_id = 0x0200
    multicast._available.add(1)

    ret = await _subscribe(multicast, grp_id, success=False)
    assert ret != t.EmberStatus.SUCCESS
    set_entry = multicast._ezsp.setMulticastTableEntry
    assert set_entry.call_count == 1
    assert set_entry.call_args[0][1].multicastId == grp_id
    assert grp_id not in multicast._multicast
    assert len(multicast._available) == 1


async def test_subscribe_no_avail(multicast):
    grp_id = 0x0200

    ret = await _subscribe(multicast, grp_id, success=True)
    assert ret != t.EmberStatus.SUCCESS


def _unsubscribe(multicast, group_id, success=True):
    async def mock_set(*args):
        if success:
            return [t.EmberStatus.SUCCESS]
        return [t.EmberStatus.ERR_FATAL]

    multicast._ezsp.setMulticastTableEntry = MagicMock()
    multicast._ezsp.setMulticastTableEntry.side_effect = mock_set
    return multicast.unsubscribe(group_id)


async def test_unsubscribe(multicast):
    grp_id = 0x0200
    multicast._available.add(1)

    await _subscribe(multicast, grp_id, success=True)

    multicast._ezsp.setMulticastTableEntry.reset_mock()
    ret = await _unsubscribe(multicast, grp_id, success=True)
    assert ret == t.EmberStatus.SUCCESS
    set_entry = multicast._ezsp.setMulticastTableEntry
    assert set_entry.call_count == 1
    assert grp_id not in multicast._multicast
    assert len(multicast._available) == 1

    multicast._ezsp.setMulticastTableEntry.reset_mock()
    ret = await _unsubscribe(multicast, grp_id, success=True)
    assert ret != t.EmberStatus.SUCCESS
    set_entry = multicast._ezsp.setMulticastTableEntry
    assert set_entry.call_count == 0
    assert grp_id not in multicast._multicast
    assert len(multicast._available) == 1


async def test_unsubscribe_fail(multicast):
    grp_id = 0x0200
    multicast._available.add(1)

    await _subscribe(multicast, grp_id, success=True)

    multicast._ezsp.setMulticastTableEntry.reset_mock()
    ret = await _unsubscribe(multicast, grp_id, success=False)
    assert ret != t.EmberStatus.SUCCESS
    set_entry = multicast._ezsp.setMulticastTableEntry
    assert set_entry.call_count == 1
    assert grp_id in multicast._multicast
    assert len(multicast._available) == 0