1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
|
"""Tests for the masking tools."""
import hashlib
import hmac
import importlib
import sys
import threading
from types import ModuleType
import pytest
from asusrouter.tools.identifiers import MacAddress
from asusrouter.tools.masking import _read_key_from_string
MODULE = "asusrouter.tools.masking"
ENV_KEY_NAME = "ASUSROUTER_MASK_KEY"
TEST_KEY = "0f" * 32
TEST_MAC = "aa:bb:cc:11:22:33"
def _reload_masking_with_env(
monkeypatch: pytest.MonkeyPatch, env_value: str
) -> ModuleType:
"""Set up environment for testing masking."""
monkeypatch.setenv(ENV_KEY_NAME, env_value)
# remove possibly cached module so env var is read at import time
sys.modules.pop(MODULE, None)
return importlib.import_module(MODULE)
def _reload_masking(monkeypatch: pytest.MonkeyPatch) -> ModuleType:
"""Reload the masking module without any environment variables."""
return _reload_masking_with_env(monkeypatch, TEST_KEY)
@pytest.mark.parametrize(
("key_str", "key"),
[
# even-length valid hex (lowercase)
("0f" * 32, bytes.fromhex("0f" * 32)),
# even-length valid hex (uppercase)
("0F" * 32, bytes.fromhex("0F" * 32)),
# empty string -> fromhex('') -> b''
("", bytes.fromhex("")),
# plain non-hex string -> fallback to UTF-8 encoding
("my-secret-key", b"my-secret-key"),
# invalid-hex characters (even length) -> fallback to encoding
("zz" * 16, ("zz" * 16).encode()),
# odd-length string -> fromhex would raise -> fallback to encoding
("abc", b"abc"),
],
)
def test_read_key_from_string(key_str: str, key: bytes) -> None:
"""Test _read_key_from_string method."""
assert _read_key_from_string(key_str) == key
@pytest.mark.parametrize("key", [TEST_KEY, "1234567890"])
def test_env_load(key: str, monkeypatch: pytest.MonkeyPatch) -> None:
"""Test if environment variables are loaded correctly."""
# Set environment variable
mod = _reload_masking_with_env(monkeypatch, key)
# Check via the public API call
assert mod.get_key_hex().lower() == key.lower()
assert bytes.fromhex(mod.get_key_hex()) == bytes.fromhex(key)
def test_configure_key_bytes(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test configuring key with bytes sets the exact key."""
mod = _reload_masking(monkeypatch)
# Set key
key = b"13" * 32
mod.configure_key(key)
# Test that the key is set correctly
assert mod.get_key_hex() == key.hex()
def test_configure_key_hex_string(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test configuring key with hex string sets the expected key."""
mod = _reload_masking(monkeypatch)
# Set key as hex string
key_hex = "13" * 32
mod.configure_key(key_hex)
# Test that the key is set correctly
assert mod.get_key_hex() == key_hex.lower()
def test_configure_key_any_string(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test configuring key with plain string uses UTF-8 encoding."""
mod = _reload_masking(monkeypatch)
# Set key as plain string
s = "my-secret-key"
mod.configure_key(s)
# Test that the key is set correctly
assert mod.get_key_hex() == s.encode().hex()
def test_configure_key_none(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test configuring key with None generates a new key."""
mod = _reload_masking(monkeypatch)
# Set a defined key
mod.configure_key(b"\x00" * 32)
k1 = mod.get_key_hex()
# Set key to None
mod.configure_key(None)
k2 = mod.get_key_hex()
# Test that the key is set
assert isinstance(k2, str)
# Test that the key is different
assert k1 != k2
def test_configure_key_affects_output(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that configuring the key affects the output."""
mod = _reload_masking(monkeypatch)
mac = MacAddress.from_value(TEST_MAC)
# Set key A
mod.configure_key(b"A" * 32)
m1 = mod.mask_mac(mac)
# Set key B
mod.configure_key(b"B" * 32)
m2 = mod.mask_mac(mac)
# Check that results are different
assert m1 != m2
# Set again key A and check that the result is the same as the first
mod.configure_key(b"A" * 32)
m1b = mod.mask_mac(mac)
assert m1 == m1b
def test_masking_with_env(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test masking behavior with different environment configurations."""
mod = _reload_masking(monkeypatch)
orig = TEST_MAC
m1 = mod.mask_mac(orig)
m2 = mod.mask_mac(orig)
assert isinstance(m1, MacAddress)
assert isinstance(m2, MacAddress)
# Check that masking is deterministic
assert m1 == m2
# Check that it differs from the original
assert m1 != MacAddress.from_value(orig)
def test_masked_mac_locally_administered(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Test that masked MAC is locally-administered."""
mod = _reload_masking(monkeypatch)
masked = mod.mask_mac(TEST_MAC)
# Get the first byte
b0 = masked.to_bytes()[0]
# Locally-administered bit set
assert (b0 & 0x02) != 0
# Multicast bit cleared
assert (b0 & 0x01) == 0
def test_mask_mac_invalid_input_raises(
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Invalid MAC input should raise ValueError via MacAddress.from_value."""
mod = _reload_masking(monkeypatch)
with pytest.raises(ValueError, match="MAC"):
mod.mask_mac("not-a-mac")
def test_configure_key_thread_safe(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test concurrent configure_key + mask_mac behavior."""
mod = _reload_masking(monkeypatch)
# Writer changes the key repeatedly
def writer(i: int) -> None:
k = bytes([i % 256]) * 32
for _ in range(50):
mod.configure_key(k)
# Reader calls mask_mac repeatedly and records success/failure
results: list[bytes | tuple[str, str]] = []
def reader() -> None:
for _ in range(50):
try:
m = mod.mask_mac(TEST_MAC)
results.append(m.to_bytes())
except Exception as ex: # noqa: BLE001
results.append(("err", str(ex)))
writers = [threading.Thread(target=writer, args=(i,)) for i in range(4)]
readers = [threading.Thread(target=reader) for _ in range(4)]
for t in writers + readers:
t.start()
for t in writers + readers:
t.join()
# Ensure no reader observed exceptions
assert all(not (isinstance(r, tuple) and r[0] == "err") for r in results)
# And at least some successful reads happened
assert len(results) > 0
def test_hmac_digest_thread_safe(monkeypatch: pytest.MonkeyPatch) -> None:
"""_hmac_digest must produce the same result when invoked concurrently."""
mod = _reload_masking(monkeypatch)
key_bytes = bytes.fromhex(TEST_KEY)
data = b"concurrent-payload"
expected = hmac.new(key_bytes, data, hashlib.sha256).digest()
threads_count = 16
results: list[bytes | None] = [None] * threads_count
def worker(idx: int) -> None:
"""Call the module-level helper without passing explicit key."""
results[idx] = mod._hmac_digest(data, None)
threads = [
threading.Thread(target=worker, args=(i,))
for i in range(threads_count)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert all(r == expected for r in results)
def test_hmac_digest_with_key(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test _hmac_digest with a specific key."""
mod = _reload_masking(monkeypatch)
key = bytes.fromhex(TEST_KEY)
data = b"payload"
expected = hmac.new(key, data, hashlib.sha256).digest()
result = mod._hmac_digest(data, key)
assert result == expected
|