File: test_auth_metrics.py

package info (click to toggle)
python-snitun 0.45.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 664 kB
  • sloc: python: 6,703; sh: 5; makefile: 3
file content (108 lines) | stat: -rw-r--r-- 3,749 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Test authentication metrics."""

from unittest.mock import AsyncMock, MagicMock, call

import pytest

from snitun.exceptions import SniTunChallengeError, SniTunInvalidPeer
from snitun.server.listener_peer import PeerListener
from snitun.server.peer_manager import PeerManager


@pytest.mark.asyncio
async def test_auth_success_metrics() -> None:
    """Test that successful authentication increments the right metrics."""
    mock_metrics = MagicMock()
    mock_peer_manager = MagicMock(spec=PeerManager)
    mock_peer = MagicMock()
    mock_peer.is_connected = False
    mock_peer.init_multiplexer_challenge = AsyncMock()
    mock_peer_manager.create_peer.return_value = mock_peer

    listener = PeerListener(mock_peer_manager, metrics=mock_metrics)
    reader = AsyncMock()
    writer = MagicMock()
    writer.close = MagicMock()
    writer.transport.is_closing.return_value = False

    await listener.handle_connection(reader, writer, data=b"test_token")

    assert mock_metrics.increment.call_count == 2
    mock_metrics.increment.assert_has_calls(
        [
            call("snitun.auth.attempts", tags={"result": "started"}),
            call("snitun.auth.attempts", tags={"result": "success"}),
        ],
    )


@pytest.mark.asyncio
async def test_auth_invalid_token_metrics() -> None:
    """Test that invalid token increments failure metrics."""
    mock_metrics = MagicMock()
    mock_peer_manager = MagicMock(spec=PeerManager)
    mock_peer_manager.create_peer.side_effect = SniTunInvalidPeer("Invalid token")

    listener = PeerListener(mock_peer_manager, metrics=mock_metrics)
    reader = AsyncMock()
    writer = MagicMock()
    writer.close = MagicMock()
    writer.transport.is_closing.return_value = False

    await listener.handle_connection(reader, writer, data=b"invalid_token")

    assert mock_metrics.increment.call_count == 2
    mock_metrics.increment.assert_has_calls(
        [
            call("snitun.auth.attempts", tags={"result": "started"}),
            call("snitun.auth.failures", tags={"reason": "invalid_token"}),
        ],
    )


@pytest.mark.asyncio
async def test_auth_challenge_failed_metrics() -> None:
    """Test that challenge failure increments failure metrics."""
    mock_metrics = MagicMock()
    mock_peer_manager = MagicMock(spec=PeerManager)
    mock_peer = MagicMock()
    mock_peer.init_multiplexer_challenge = AsyncMock(
        side_effect=SniTunChallengeError("Challenge failed"),
    )
    mock_peer_manager.create_peer.return_value = mock_peer

    listener = PeerListener(mock_peer_manager, metrics=mock_metrics)
    reader = AsyncMock()
    writer = MagicMock()
    writer.close = MagicMock()
    writer.transport.is_closing.return_value = False

    await listener.handle_connection(reader, writer, data=b"test_token")

    assert mock_metrics.increment.call_count == 2
    mock_metrics.increment.assert_has_calls(
        [
            call("snitun.auth.attempts", tags={"result": "started"}),
            call("snitun.auth.failures", tags={"reason": "challenge_failed"}),
        ],
    )


@pytest.mark.asyncio
async def test_no_metrics_does_not_crash() -> None:
    """Test that listener works without metrics."""
    mock_peer_manager = MagicMock(spec=PeerManager)
    mock_peer = MagicMock()
    mock_peer.is_connected = False
    mock_peer.init_multiplexer_challenge = AsyncMock()
    mock_peer_manager.create_peer.return_value = mock_peer

    listener = PeerListener(mock_peer_manager, metrics=None)
    reader = AsyncMock()
    writer = MagicMock()
    writer.close = MagicMock()
    writer.transport.is_closing.return_value = False

    await listener.handle_connection(reader, writer, data=b"test_token")

    mock_peer_manager.create_peer.assert_called_once()