1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
"""Unit test for KNX/IP TunnellingRequest objects."""
import pytest
from xknx.exceptions import CouldNotParseKNXIP
from xknx.knxip import (
ErrorCode,
KNXIPFrame,
TunnellingFeatureGet,
TunnellingFeatureInfo,
TunnellingFeatureResponse,
TunnellingFeatureSet,
TunnellingFeatureType,
)
from xknx.management.application_layer_enum import ReturnCode
class TestKNXIPTunnellingFeature:
"""Test class for KNX/IP TunnelingFeature objects."""
def test_get(self) -> None:
"""Test parsing and streaming connection tunneling feature get KNX/IP packet."""
raw = bytes.fromhex("06 10 04 22 00 0c 04 01 17 00 03 00")
knxipframe, _ = KNXIPFrame.from_knx(raw)
assert isinstance(knxipframe.body, TunnellingFeatureGet)
assert knxipframe.body.communication_channel_id == 1
assert knxipframe.body.sequence_counter == 23
assert (
knxipframe.body.feature_type == TunnellingFeatureType.BUS_CONNECTION_STATUS
)
assert len(knxipframe.body.data) == 0
tunnelling_request = TunnellingFeatureGet(
communication_channel_id=1,
sequence_counter=23,
feature_type=TunnellingFeatureType.BUS_CONNECTION_STATUS,
)
knxipframe2 = KNXIPFrame.init_from_body(tunnelling_request)
assert knxipframe2.to_knx() == raw
def test_info(self) -> None:
"""Test parsing and streaming connection tunneling feature info KNX/IP packet."""
raw = bytes.fromhex("06 10 04 25 00 0e 04 01 17 00 03 00 01 00")
knxipframe, _ = KNXIPFrame.from_knx(raw)
assert isinstance(knxipframe.body, TunnellingFeatureInfo)
assert knxipframe.body.communication_channel_id == 1
assert knxipframe.body.sequence_counter == 23
assert (
knxipframe.body.feature_type == TunnellingFeatureType.BUS_CONNECTION_STATUS
)
assert knxipframe.body.data == b"\x01\x00"
tunnelling_request = TunnellingFeatureInfo(
communication_channel_id=1,
sequence_counter=23,
feature_type=TunnellingFeatureType.BUS_CONNECTION_STATUS,
data=b"\x01\x00",
)
knxipframe2 = KNXIPFrame.init_from_body(tunnelling_request)
assert knxipframe2.to_knx() == raw
def test_response(self) -> None:
"""Test parsing and streaming connection tunneling feature response KNX/IP packet."""
raw = bytes.fromhex("06 10 04 23 00 0e 04 01 17 00 03 00 01 00")
knxipframe, _ = KNXIPFrame.from_knx(raw)
assert isinstance(knxipframe.body, TunnellingFeatureResponse)
assert knxipframe.body.communication_channel_id == 1
assert knxipframe.body.sequence_counter == 23
assert knxipframe.body.status_code == ErrorCode.E_NO_ERROR
assert (
knxipframe.body.feature_type == TunnellingFeatureType.BUS_CONNECTION_STATUS
)
assert knxipframe.body.return_code == ReturnCode.E_SUCCESS
assert knxipframe.body.data == b"\x01\x00"
tunnelling_request = TunnellingFeatureResponse(
communication_channel_id=1,
sequence_counter=23,
feature_type=TunnellingFeatureType.BUS_CONNECTION_STATUS,
return_code=ReturnCode.E_SUCCESS,
data=b"\x01\x00",
)
knxipframe2 = KNXIPFrame.init_from_body(tunnelling_request)
assert knxipframe2.to_knx() == raw
def test_set(self) -> None:
"""Test parsing and streaming connection tunneling feature set KNX/IP packet."""
raw = bytes.fromhex("06 10 04 24 00 0e 04 01 17 00 08 00 01 00")
knxipframe, _ = KNXIPFrame.from_knx(raw)
assert isinstance(knxipframe.body, TunnellingFeatureSet)
assert knxipframe.body.communication_channel_id == 1
assert knxipframe.body.sequence_counter == 23
assert (
knxipframe.body.feature_type
== TunnellingFeatureType.INTERFACE_FEATURE_INFO_ENABLE
)
assert knxipframe.body.data == b"\x01\x00"
tunnelling_request = TunnellingFeatureSet(
communication_channel_id=1,
sequence_counter=23,
feature_type=TunnellingFeatureType.INTERFACE_FEATURE_INFO_ENABLE,
data=b"\x01\x00",
)
knxipframe2 = KNXIPFrame.init_from_body(tunnelling_request)
assert knxipframe2.to_knx() == raw
def test_from_knx_wrong_header(self) -> None:
"""Test parsing and streaming wrong Tunnelling Feature (wrong header length byte)."""
raw = bytes.fromhex("06 10 04 22 00 0c 06 01 17 00 03 00")
with pytest.raises(CouldNotParseKNXIP):
KNXIPFrame.from_knx(raw)
def test_get_with_data(self) -> None:
"""Test parsing and streaming wrong Get (unexpected data)."""
raw = bytes.fromhex("06 10 04 22 00 0e 04 01 17 00 03 00 01 00")
with pytest.raises(CouldNotParseKNXIP):
KNXIPFrame.from_knx(raw)
def test_set_without_data(self) -> None:
"""Test parsing and streaming wrong Get (missing data)."""
raw = bytes.fromhex("06 10 04 24 00 0c 04 01 17 00 03 00")
with pytest.raises(CouldNotParseKNXIP):
KNXIPFrame.from_knx(raw)
|