File: test_authentication.py

package info (click to toggle)
amqtt 0.11.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,660 kB
  • sloc: python: 14,565; sh: 42; makefile: 34; javascript: 27
file content (129 lines) | stat: -rw-r--r-- 4,627 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import asyncio
import logging
from pathlib import Path
import unittest

import pytest

from amqtt.plugins.authentication import AnonymousAuthPlugin, FileAuthPlugin
from amqtt.contexts import BaseContext
from amqtt.plugins.base import BaseAuthPlugin
from amqtt.session import Session

formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.DEBUG, format=formatter)


@pytest.mark.asyncio
async def test_base_no_config(logdog):
    """Check BaseTopicPlugin returns false if no topic-check is present."""
    with logdog() as pile:
        context = BaseContext()
        context.logger = logging.getLogger("testlog")
        context.config = {}

        plugin = BaseAuthPlugin(context)
        s = Session()
        authorised = await plugin.authenticate(session=s)
        assert authorised is False

        # Warning messages are only generated if using deprecated plugin configuration on initial load
        log_records = list(pile.drain(name="testlog"))
        assert len(log_records) == 1
        assert log_records[0].levelno == logging.WARNING
        assert log_records[0].message == "'auth' section not found in context configuration"


class TestAnonymousAuthPlugin(unittest.TestCase):
    def setUp(self) -> None:
        self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()

    def test_allow_anonymous_dict_config(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = {"auth": {"allow-anonymous": True}}
        s = Session()
        s.username = ""
        auth_plugin = AnonymousAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert ret

    def test_allow_anonymous_dataclass_config(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = AnonymousAuthPlugin.Config(allow_anonymous=True)
        s = Session()
        s.username = ""
        auth_plugin = AnonymousAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert ret

    def test_disallow_anonymous(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = {"auth": {"allow-anonymous": False}}
        s = Session()
        s.username = ""
        auth_plugin = AnonymousAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert not ret

    def test_allow_nonanonymous(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = {"auth": {"allow-anonymous": False}}
        s = Session()
        s.username = "test"
        auth_plugin = AnonymousAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert ret


class TestFileAuthPlugin(unittest.TestCase):
    def setUp(self) -> None:
        self.loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()

    def test_allow(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = {
            "auth": {
                "password-file": Path(__file__).parent / "passwd",
            },
        }
        s = Session()
        s.username = "user"
        s.password = "test"
        auth_plugin = FileAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert ret

    def test_wrong_password(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = {
            "auth": {
                "password-file": Path(__file__).parent / "passwd",
            },
        }
        s = Session()
        s.username = "user"
        s.password = "wrong password"
        auth_plugin = FileAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert not ret

    def test_unknown_password(self) -> None:
        context = BaseContext()
        context.logger = logging.getLogger(__name__)
        context.config = {
            "auth": {
                "password-file": Path(__file__).parent / "passwd",
            },
        }
        s = Session()
        s.username = "some user"
        s.password = "some password"
        auth_plugin = FileAuthPlugin(context)
        ret = self.loop.run_until_complete(auth_plugin.authenticate(session=s))
        assert not ret