File: routing_test.py

package info (click to toggle)
python-xknx 3.10.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,044 kB
  • sloc: python: 40,087; javascript: 8,556; makefile: 32; sh: 12
file content (189 lines) | stat: -rw-r--r-- 7,499 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
"""Tests for KNX/IP routing connections."""

import asyncio
from unittest.mock import AsyncMock, Mock, call, patch

from xknx import XKNX
from xknx.cemi import CEMIFrame, CEMILData, CEMIMessageCode
from xknx.io import Routing
from xknx.io.routing import (
    BUSY_DECREMENT_TIME,
    BUSY_INCREMENT_COOLDOWN,
    BUSY_RANDOM_TIME_FACTOR,
    BUSY_SLOWDURATION_TIME_FACTOR,
    ROUTING_INDICATION_WAIT_TIME,
    _RoutingFlowControl,
)
from xknx.knxip import KNXIPFrame, RoutingBusy, RoutingIndication
from xknx.telegram import IndividualAddress, Telegram, TelegramDirection, tpci

from ..conftest import EventLoopClockAdvancer


class TestRouting:
    """Test class for xknx/io/Routing objects."""

    def setup_method(self) -> None:
        """Set up test class."""
        # pylint: disable=attribute-defined-outside-init
        self.xknx = XKNX()
        self.cemi_received_mock = AsyncMock()

    @patch("xknx.io.Routing._send_knxipframe")
    async def test_request_received_callback(self, send_knxipframe_mock: Mock) -> None:
        """Test Routing for responding to a transport layer connection."""
        routing = Routing(
            self.xknx,
            individual_address=None,
            cemi_received_callback=self.xknx.knxip_interface.cemi_received,
            local_ip="192.168.1.1",
        )
        self.xknx.knxip_interface._interface = routing
        # set current address so management telegram is processed
        self.xknx.current_address = IndividualAddress("1.0.255")
        # L_Data.ind T_Connect from 1.0.250 to 1.0.255 (xknx tunnel endpoint)
        # communication_channel_id: 0x02   sequence_counter: 0x81
        raw_ind = bytes.fromhex("0610 0530 0010 2900b06010fa10ff0080")
        _cemi = CEMIFrame.from_knx(raw_ind[6:])
        test_telegram = _cemi.data.telegram()
        test_telegram.direction = TelegramDirection.INCOMING

        response_telegram = Telegram(
            destination_address=IndividualAddress(test_telegram.source_address),
            tpci=tpci.TDisconnect(),
        )
        response_cemi = CEMIFrame(
            code=CEMIMessageCode.L_DATA_IND,
            data=CEMILData.init_from_telegram(
                telegram=response_telegram,
                src_addr=IndividualAddress("1.0.255"),
            ),
        )
        response_frame = KNXIPFrame.init_from_body(
            RoutingIndication(raw_cemi=response_cemi.to_knx())
        )

        routing.transport.data_received_callback(raw_ind, ("192.168.1.2", 3671))
        await asyncio.sleep(0)
        assert send_knxipframe_mock.call_args_list == [
            call(response_frame),
        ]
        await asyncio.sleep(0)  # await local L_Data.con

    @patch("logging.Logger.warning")
    async def test_routing_lost_message(self, logging_mock: Mock) -> None:
        """Test class for received RoutingLostMessage frames."""
        routing = Routing(
            self.xknx,
            individual_address=None,
            cemi_received_callback=AsyncMock(),
            local_ip="192.168.1.1",
        )
        raw = bytes((0x06, 0x10, 0x05, 0x31, 0x00, 0x0A, 0x04, 0x00, 0x00, 0x05))
        routing.transport.data_received_callback(raw, ("192.168.1.2", 3671))
        logging_mock.assert_called_once_with(
            "RoutingLostMessage received from %s - %s lost messages.",
            "192.168.1.2",
            5,
        )


class TestFlowControl:
    """Test class for KNXnet/IP routing flow control."""

    async def test_basic_throttling(self, time_travel: EventLoopClockAdvancer) -> None:
        """Test throttling outgoing frames."""
        flow_control = _RoutingFlowControl()
        mock = Mock()

        async def test_send() -> None:
            async with flow_control.throttle():
                mock()

        # first send is called immediately
        task = asyncio.create_task(test_send())
        await asyncio.sleep(0)
        assert mock.call_count == 1
        assert task.done()
        mock.reset_mock()

        # second send is throttled
        task = asyncio.create_task(test_send())
        await asyncio.sleep(0)
        assert mock.call_count == 0
        await time_travel(ROUTING_INDICATION_WAIT_TIME / 4)
        assert not task.done()
        await time_travel(ROUTING_INDICATION_WAIT_TIME / 4 * 3)
        assert task.done()
        assert mock.call_count == 1
        mock.reset_mock()

        # later send is called immediately
        await time_travel(ROUTING_INDICATION_WAIT_TIME)
        task = asyncio.create_task(test_send())
        await asyncio.sleep(0)
        assert mock.call_count == 1
        assert task.done()
        mock.reset_mock()

    @patch("random.random")
    async def test_routing_busy(
        self, random_mock: Mock, time_travel: EventLoopClockAdvancer
    ) -> None:
        """Test throttling on received RoutingBusy frame."""
        flow_control = _RoutingFlowControl()
        mock = Mock()
        test_wait_time_ms = 100
        random_mock.return_value = 0.5

        async def test_send() -> None:
            async with flow_control.throttle():
                mock()

        test_busy = RoutingBusy(wait_time=test_wait_time_ms)

        flow_control.handle_routing_busy(test_busy)
        task = asyncio.create_task(test_send())
        await asyncio.sleep(0)
        assert mock.call_count == 0
        await time_travel(test_wait_time_ms / 1000)
        assert mock.call_count == 1
        assert task.done()
        # no slowduration for just 1 RoutingBusy
        assert flow_control._timer_task.done()
        mock.reset_mock()

        # multiple RoutingBusy frames
        flow_control.handle_routing_busy(test_busy)
        # after cooldown - with different wait times updating wait time for 2x time
        # not counting one frame due to cooldown time
        await time_travel(BUSY_INCREMENT_COOLDOWN)
        flow_control.handle_routing_busy(RoutingBusy(wait_time=test_wait_time_ms // 2))
        assert flow_control._received_busy_frames == 1
        assert flow_control._wait_time_ms == test_wait_time_ms
        # doesn't count in same 10 ms window but updates wait time
        flow_control.handle_routing_busy(RoutingBusy(wait_time=test_wait_time_ms * 2))
        assert flow_control._received_busy_frames == 1
        assert flow_control._wait_time_ms == test_wait_time_ms * 2
        # add second busy frame after cooldown has passed
        await time_travel(BUSY_INCREMENT_COOLDOWN)
        flow_control.handle_routing_busy(RoutingBusy(wait_time=test_wait_time_ms // 2))
        assert flow_control._wait_time_ms == test_wait_time_ms * 2
        assert flow_control._received_busy_frames == 2

        task = asyncio.create_task(test_send())
        assert mock.call_count == 0
        await time_travel(test_wait_time_ms * 2 / 1000 - BUSY_INCREMENT_COOLDOWN)
        assert mock.call_count == 0  # not yet resolved, wait for additional random time
        await time_travel(
            flow_control._received_busy_frames * BUSY_RANDOM_TIME_FACTOR * random_mock()
        )
        assert mock.call_count == 1
        assert task.done()
        # slowduration
        assert not flow_control._timer_task.done()
        await time_travel(2 * BUSY_SLOWDURATION_TIME_FACTOR)  # _received_busy_frames 2
        await time_travel(BUSY_DECREMENT_TIME)  # and decrement time
        assert not flow_control._timer_task.done()
        await time_travel(BUSY_DECREMENT_TIME)  # and second decrement time
        assert flow_control._timer_task.done()