1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
"""Unit test for keyring reader."""
from pathlib import Path
import pytest
from xknx.exceptions.exception import InvalidSecureConfiguration
from xknx.secure.keyring import (
InterfaceType,
Keyring,
XMLDevice,
XMLInterface,
sync_load_keyring,
verify_keyring_signature,
)
from xknx.telegram import GroupAddress, IndividualAddress
class TestKeyRing:
"""Test class for keyring."""
keyring_test_file = Path(__file__).parent / "resources/keyring.knxkeys"
testcase_file: str = Path(__file__).parent / "resources/testcase.knxkeys"
special_chars_file = (
Path(__file__).parent / "resources/special_chars_secure_tunnel.knxkeys"
)
data_secure_ip = (
Path(__file__).parent / "resources/DataSecure_only_one_interface.knxkeys"
)
data_secure_usb = Path(__file__).parent / "resources/DataSecure_usb.knxkeys"
@staticmethod
def assert_interface(
keyring: Keyring, password: str, ia: IndividualAddress
) -> None:
"""Verify password for given user."""
matched = False
if interface := keyring.get_tunnel_interface_by_individual_address(ia):
matched = True
assert interface.decrypted_password == password
assert matched
def test_load_keyring_test(self) -> None:
"""Test load keyring from knxkeys file."""
keyring = sync_load_keyring(self.keyring_test_file, "pwd")
TestKeyRing.assert_interface(keyring, "user4", IndividualAddress("1.1.4"))
TestKeyRing.assert_interface(keyring, "@zvI1G&_", IndividualAddress("1.1.6"))
TestKeyRing.assert_interface(keyring, "ZvDY-:g#", IndividualAddress("1.1.7"))
TestKeyRing.assert_interface(keyring, "user2", IndividualAddress("1.1.2"))
assert keyring.backbone.multicast_address == "224.0.23.12"
assert keyring.backbone.latency == 1000
assert keyring.backbone.decrypted_key == bytes.fromhex(
"96f034fccf510760cbd63da0f70d4a9d"
)
def test_load_testcase_file(self) -> None:
"""Test load keyring from knxkeys file."""
keyring = sync_load_keyring(self.testcase_file, "password")
TestKeyRing.assert_interface(keyring, "user1", IndividualAddress("1.0.1"))
TestKeyRing.assert_interface(keyring, "user2", IndividualAddress("1.0.11"))
TestKeyRing.assert_interface(keyring, "user3", IndividualAddress("1.0.12"))
TestKeyRing.assert_interface(keyring, "user4", IndividualAddress("1.0.13"))
assert keyring.devices[0].decrypted_management_password == "commissioning"
assert keyring.backbone.multicast_address == "224.0.23.12"
assert keyring.backbone.latency == 1000
assert keyring.backbone.decrypted_key == bytes.fromhex(
"cf89fd0f18f4889783c7ef44ee1f5e14"
)
interface: XMLInterface = keyring.interfaces[0]
device: XMLDevice = keyring.get_device_by_interface(interface)
assert device is not None
assert device.decrypted_authentication == "authenticationcode"
def test_load_special_chars_file(self) -> None:
"""Test load keyring from knxkeys file."""
keyring = sync_load_keyring(self.special_chars_file, "test")
TestKeyRing.assert_interface(keyring, "tunnel_2", IndividualAddress("1.0.2"))
TestKeyRing.assert_interface(keyring, "tunnel_3", IndividualAddress("1.0.3"))
TestKeyRing.assert_interface(keyring, "tunnel_4", IndividualAddress("1.0.4"))
TestKeyRing.assert_interface(keyring, "tunnel_5", IndividualAddress("1.0.5"))
TestKeyRing.assert_interface(keyring, "tunnel_6", IndividualAddress("1.0.6"))
assert keyring.backbone is None
def test_load_data_secure_ip(self) -> None:
"""Test load keyring from knxkeys file."""
keyring = sync_load_keyring(self.data_secure_ip, "test")
assert len(keyring.interfaces) == 1
tunnel = keyring.interfaces[0]
assert tunnel is not None
assert tunnel.password is None
assert tunnel.decrypted_password is None
assert tunnel.user_id is None
assert tunnel.type is InterfaceType.TUNNELING
assert len(tunnel.group_addresses) == 3
assert tunnel.group_addresses[GroupAddress("0/0/1")] == [
IndividualAddress("1.0.1"),
IndividualAddress("1.0.2"),
]
assert tunnel.group_addresses[GroupAddress("0/0/3")] == [
IndividualAddress("1.0.1"),
IndividualAddress("1.0.2"),
]
assert tunnel.group_addresses[GroupAddress("31/7/255")] == []
assert keyring.backbone is None
def test_load_data_secure_usb(self) -> None:
"""Test load keyring from knxkeys file."""
keyring = sync_load_keyring(self.data_secure_usb, "test")
assert len(keyring.interfaces) == 1
interface = keyring.interfaces[0]
assert interface is not None
assert interface.password is None
assert interface.decrypted_password is None
assert interface.user_id is None
assert interface.host is None
assert interface.type is InterfaceType.USB
assert len(interface.group_addresses) == 1
assert interface.group_addresses[GroupAddress("31/7/255")] == [
IndividualAddress("1.0.4")
]
assert keyring.backbone is None
def test_verify_signature(self) -> None:
"""Test signature verification."""
assert verify_keyring_signature(self.keyring_test_file, "pwd")
assert verify_keyring_signature(self.testcase_file, "password")
assert verify_keyring_signature(self.special_chars_file, "test")
def test_invalid_signature(self) -> None:
"""Test invalid signature throws error."""
with pytest.raises(InvalidSecureConfiguration):
sync_load_keyring(self.testcase_file, "wrong_password")
def test_raises_error(self) -> None:
"""Test raises error if password is wrong."""
with pytest.raises(InvalidSecureConfiguration):
sync_load_keyring(
self.testcase_file, "wrong_password", validate_signature=False
)
def test_keyring_get_methods_full(self) -> None:
"""Test keyring get_* methods for full project export."""
keyring = sync_load_keyring(self.keyring_test_file, "pwd")
test_interfaces = keyring.get_tunnel_interfaces_by_host(
host=IndividualAddress("1.1.10")
)
assert len(test_interfaces) == 1
test_interface = test_interfaces[0]
test_device = keyring.get_device_by_interface(interface=test_interface)
assert test_device.individual_address == IndividualAddress("1.1.10")
test_host = keyring.get_tunnel_host_by_interface(
tunnelling_slot=IndividualAddress("1.1.8")
)
assert test_host == IndividualAddress("1.1.0")
test_host = keyring.get_tunnel_host_by_interface(
tunnelling_slot=IndividualAddress("1.1.10")
)
assert test_host is None
test_interface = keyring.get_tunnel_interface_by_host_and_user_id(
host=IndividualAddress("1.1.0"), user_id=4
)
assert test_interface.individual_address == IndividualAddress("1.1.7")
test_interface = keyring.get_tunnel_interface_by_individual_address(
tunnelling_slot=IndividualAddress("1.1.8")
)
assert test_interface.user_id == 8
test_interface = keyring.get_tunnel_interface_by_individual_address(
tunnelling_slot=IndividualAddress("1.1.20")
)
assert test_interface.user_id is None
assert test_interface.host == IndividualAddress("1.1.10")
# this doesn't check for `type`, but there are no other than TUNNELLING interfaces in this keyring
test_interface = keyring.get_interface_by_individual_address(
individual_address=IndividualAddress("1.1.20")
)
assert test_interface.host == IndividualAddress("1.1.10")
full_ga_key_table = keyring.get_data_secure_group_keys()
assert len(full_ga_key_table) == 1
individual_ga_key_table = keyring.get_data_secure_group_keys(
receiver=IndividualAddress("1.1.7")
)
assert len(individual_ga_key_table) == 0
ia_seq_nums = keyring.get_data_secure_senders()
assert len(ia_seq_nums) == 5
def test_keyring_get_methods_one_interface(self) -> None:
"""Test keyring get_* methods for partial export."""
keyring = sync_load_keyring(self.data_secure_ip, "test")
full_ga_key_table = keyring.get_data_secure_group_keys()
assert len(full_ga_key_table) == 3
individual_ga_key_table = keyring.get_data_secure_group_keys(
receiver=IndividualAddress("1.0.4")
)
assert len(individual_ga_key_table) == 3
ia_seq_nums = keyring.get_data_secure_senders()
assert ia_seq_nums == {
IndividualAddress("1.0.1"): 0,
IndividualAddress("1.0.2"): 0,
}
def test_keyring_metadata(self) -> None:
"""Test keyring metadata parsing."""
keyring = sync_load_keyring(self.data_secure_ip, "test")
assert keyring.project_name == "DataSecure_only"
assert keyring.created_by == "ETS 5.7.7 (Build 1428)"
assert keyring.created == "2023-02-06T21:17:09"
assert keyring.xmlns == "http://knx.org/xml/keyring/1"
|