1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
|
"""Test authentication metrics."""
from unittest.mock import AsyncMock, MagicMock, call
import pytest
from snitun.exceptions import SniTunChallengeError, SniTunInvalidPeer
from snitun.server.listener_peer import PeerListener
from snitun.server.peer_manager import PeerManager
@pytest.mark.asyncio
async def test_auth_success_metrics() -> None:
"""Test that successful authentication increments the right metrics."""
mock_metrics = MagicMock()
mock_peer_manager = MagicMock(spec=PeerManager)
mock_peer = MagicMock()
mock_peer.is_connected = False
mock_peer.init_multiplexer_challenge = AsyncMock()
mock_peer_manager.create_peer.return_value = mock_peer
listener = PeerListener(mock_peer_manager, metrics=mock_metrics)
reader = AsyncMock()
writer = MagicMock()
writer.close = MagicMock()
writer.transport.is_closing.return_value = False
await listener.handle_connection(reader, writer, data=b"test_token")
assert mock_metrics.increment.call_count == 2
mock_metrics.increment.assert_has_calls(
[
call("snitun.auth.attempts", tags={"result": "started"}),
call("snitun.auth.attempts", tags={"result": "success"}),
],
)
@pytest.mark.asyncio
async def test_auth_invalid_token_metrics() -> None:
"""Test that invalid token increments failure metrics."""
mock_metrics = MagicMock()
mock_peer_manager = MagicMock(spec=PeerManager)
mock_peer_manager.create_peer.side_effect = SniTunInvalidPeer("Invalid token")
listener = PeerListener(mock_peer_manager, metrics=mock_metrics)
reader = AsyncMock()
writer = MagicMock()
writer.close = MagicMock()
writer.transport.is_closing.return_value = False
await listener.handle_connection(reader, writer, data=b"invalid_token")
assert mock_metrics.increment.call_count == 2
mock_metrics.increment.assert_has_calls(
[
call("snitun.auth.attempts", tags={"result": "started"}),
call("snitun.auth.failures", tags={"reason": "invalid_token"}),
],
)
@pytest.mark.asyncio
async def test_auth_challenge_failed_metrics() -> None:
"""Test that challenge failure increments failure metrics."""
mock_metrics = MagicMock()
mock_peer_manager = MagicMock(spec=PeerManager)
mock_peer = MagicMock()
mock_peer.init_multiplexer_challenge = AsyncMock(
side_effect=SniTunChallengeError("Challenge failed"),
)
mock_peer_manager.create_peer.return_value = mock_peer
listener = PeerListener(mock_peer_manager, metrics=mock_metrics)
reader = AsyncMock()
writer = MagicMock()
writer.close = MagicMock()
writer.transport.is_closing.return_value = False
await listener.handle_connection(reader, writer, data=b"test_token")
assert mock_metrics.increment.call_count == 2
mock_metrics.increment.assert_has_calls(
[
call("snitun.auth.attempts", tags={"result": "started"}),
call("snitun.auth.failures", tags={"reason": "challenge_failed"}),
],
)
@pytest.mark.asyncio
async def test_no_metrics_does_not_crash() -> None:
"""Test that listener works without metrics."""
mock_peer_manager = MagicMock(spec=PeerManager)
mock_peer = MagicMock()
mock_peer.is_connected = False
mock_peer.init_multiplexer_challenge = AsyncMock()
mock_peer_manager.create_peer.return_value = mock_peer
listener = PeerListener(mock_peer_manager, metrics=None)
reader = AsyncMock()
writer = MagicMock()
writer.close = MagicMock()
writer.transport.is_closing.return_value = False
await listener.handle_connection(reader, writer, data=b"test_token")
mock_peer_manager.create_peer.assert_called_once()
|