File: connection_manager_test.py

package info (click to toggle)
python-xknx 3.6.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,012 kB
  • sloc: python: 39,710; javascript: 8,556; makefile: 27; sh: 12
file content (154 lines) | stat: -rw-r--r-- 5,959 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
"""Unit test for connection manager."""

import asyncio
from datetime import datetime
import threading
from typing import Any
from unittest.mock import Mock, patch

from xknx import XKNX
from xknx.core import XknxConnectionState, XknxConnectionType
from xknx.io import ConnectionConfig
from xknx.util import asyncio_timeout


class TestConnectionManager:
    """Test class for connection manager."""

    #
    # TEST REGISTER/UNREGISTER
    #
    async def test_register(self) -> None:
        """Test connection_state_changed after register."""

        xknx = XKNX()
        connection_state_changed_cb = Mock()
        xknx.connection_manager.register_connection_state_changed_cb(
            connection_state_changed_cb
        )
        xknx.connection_manager.connection_state_changed(
            XknxConnectionState.CONNECTED, XknxConnectionType.ROUTING_SECURE
        )
        connection_state_changed_cb.assert_called_once_with(
            XknxConnectionState.CONNECTED
        )

    async def test_unregister(self) -> None:
        """Test unregister after register."""

        xknx = XKNX()
        connection_state_changed_cb = Mock()
        xknx.connection_manager.register_connection_state_changed_cb(
            connection_state_changed_cb
        )
        xknx.connection_manager.unregister_connection_state_changed_cb(
            connection_state_changed_cb
        )
        xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
        connection_state_changed_cb.assert_not_called()

    #
    # TEST PROCESS
    #
    async def test_state_return(self) -> None:
        """Test should return if current state equals parameter."""

        xknx = XKNX()
        connection_state_changed_cb = Mock()
        xknx.connection_manager.register_connection_state_changed_cb(
            connection_state_changed_cb
        )
        assert xknx.connection_manager.state == XknxConnectionState.DISCONNECTED
        xknx.connection_manager.connection_state_changed(
            XknxConnectionState.DISCONNECTED
        )
        connection_state_changed_cb.assert_not_called()

    #
    # TEST CONNECTED
    #
    async def test_connected_event(self) -> None:
        """Test connected event callback."""

        xknx = XKNX()
        connection_state_changed_cb = Mock()
        xknx.connection_manager.register_connection_state_changed_cb(
            connection_state_changed_cb
        )

        assert not xknx.connection_manager.connected.is_set()

        xknx.connection_manager.connection_state_changed(XknxConnectionState.CONNECTED)
        connection_state_changed_cb.assert_called_once_with(
            XknxConnectionState.CONNECTED
        )

        assert xknx.connection_manager.connected.is_set()

    async def test_threaded_connection(self) -> None:
        """Test starting threaded connection."""
        # pylint: disable=attribute-defined-outside-init
        self.main_thread = threading.get_ident()
        xknx = XKNX(connection_config=ConnectionConfig(threaded=True))

        def assert_main_thread(*args: Any, **kwargs: dict[str, Any]) -> None:
            """Test callback is done by main thread."""
            assert self.main_thread == threading.get_ident()

        xknx.connection_manager.register_connection_state_changed_cb(assert_main_thread)

        async def set_connected() -> None:
            """Set connected state."""
            xknx.connection_manager.connection_state_changed(
                XknxConnectionState.CONNECTED
            )
            assert self.main_thread != threading.get_ident()

        with patch("xknx.io.KNXIPInterface._start", side_effect=set_connected):
            await xknx.start()
            # wait for side_effect to finish
            async with asyncio_timeout(1):
                await xknx.connection_manager.connected.wait()
                await asyncio.sleep(0)
            await xknx.stop()

    async def test_connection_information(self) -> None:
        """Test connection information."""
        xknx = XKNX()

        assert xknx.connection_manager.connected_since is None
        assert (
            xknx.connection_manager.connection_type is XknxConnectionType.NOT_CONNECTED
        )
        xknx.connection_manager.cemi_count_incoming = 5
        xknx.connection_manager.cemi_count_incoming_error = 5
        xknx.connection_manager.cemi_count_outgoing = 5
        xknx.connection_manager.cemi_count_outgoing_error = 5

        # reset counters on new connection
        xknx.connection_manager.connection_state_changed(
            XknxConnectionState.CONNECTED, XknxConnectionType.TUNNEL_TCP
        )
        assert xknx.connection_manager.cemi_count_incoming == 0
        assert xknx.connection_manager.cemi_count_incoming_error == 0
        assert xknx.connection_manager.cemi_count_outgoing == 0
        assert xknx.connection_manager.cemi_count_outgoing_error == 0
        assert isinstance(xknx.connection_manager.connected_since, datetime)
        assert xknx.connection_manager.connection_type is XknxConnectionType.TUNNEL_TCP

        xknx.connection_manager.cemi_count_incoming = 5
        xknx.connection_manager.cemi_count_incoming_error = 5
        xknx.connection_manager.cemi_count_outgoing = 5
        xknx.connection_manager.cemi_count_outgoing_error = 5
        # keep values until new connection; set connection timestamp to None
        xknx.connection_manager.connection_state_changed(
            XknxConnectionState.DISCONNECTED
        )
        assert xknx.connection_manager.cemi_count_incoming == 5
        assert xknx.connection_manager.cemi_count_incoming_error == 5
        assert xknx.connection_manager.cemi_count_outgoing == 5
        assert xknx.connection_manager.cemi_count_outgoing_error == 5
        assert xknx.connection_manager.connected_since is None
        assert (
            xknx.connection_manager.connection_type is XknxConnectionType.NOT_CONNECTED
        )