File: mock_pchk.py

package info (click to toggle)
pypck 0.8.10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 528 kB
  • sloc: python: 5,616; makefile: 15
file content (185 lines) | stat: -rw-r--r-- 5,820 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Fake PCHK server used for testing."""

from __future__ import annotations

import asyncio

HOST = "127.0.0.1"
PORT = 4114
USERNAME = "lcn_username"
PASSWORD = "lcn_password"

READ_TIMEOUT = -1
SOCKET_CLOSED = -2
SEPARATOR = b"\n"


async def readuntil_timeout(
    reader: asyncio.StreamReader, separator: bytes, timeout: float
) -> bytes | int:
    """Read from socket with timeout."""
    try:
        data = await asyncio.wait_for(reader.readuntil(separator), timeout)
        data = data.split(separator)[0]
        data = data.split(b"\r")[0]  # remove CR if present
        return data
    except asyncio.TimeoutError:
        return READ_TIMEOUT
    except asyncio.IncompleteReadError:
        return SOCKET_CLOSED


class MockPchkServer:
    """Mock PCHK server for integration tests."""

    def __init__(
        self,
        host: str = HOST,
        port: int = PORT,
        username: str = USERNAME,
        password: str = PASSWORD,
    ):
        """Construct PchkServer."""
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.separator = SEPARATOR
        self.license_error = False
        self.data_received: list[bytes] = []
        self.server: asyncio.AbstractServer | None = None
        self.reader: asyncio.StreamReader | None = None
        self.writer: asyncio.StreamWriter | None = None

    async def run(self) -> None:
        """Start the server."""
        self.server = await asyncio.start_server(
            self.client_connected, host=self.host, port=self.port
        )

    async def stop(self) -> None:
        """Stop the server and close connection."""
        if self.server and self.server.is_serving():
            if not (self.writer is None or self.writer.is_closing()):
                self.writer.close()
                await self.writer.wait_closed()
            self.server.close()
            await self.server.wait_closed()

    async def client_connected(
        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
    ) -> None:
        """Client connected callback."""
        # Accept only one connection.
        if self.reader or self.writer:
            return

        self.reader = reader
        self.writer = writer

        auth_ok = await self.authentication()
        if not auth_ok:
            return

        await self.main_loop()

    def set_license_error(self, license_error: bool = False) -> None:
        """Raise a license error during authentication."""
        self.license_error = license_error

    async def authentication(self) -> bool:
        """Run authentication procedure."""
        assert self.writer is not None
        assert self.reader is not None

        self.writer.write(b"LCN-PCK/IP 1.0" + self.separator)
        await self.writer.drain()

        # Ask for username
        self.writer.write(b"Username:" + self.separator)
        await self.writer.drain()

        # Read username input
        data = await readuntil_timeout(self.reader, self.separator, 60)
        if data in [READ_TIMEOUT, SOCKET_CLOSED]:
            return False

        assert isinstance(data, bytes)
        login_username = data.decode()

        # Ask for password
        self.writer.write(b"Password:" + self.separator)
        await self.writer.drain()

        # Read password input
        data = await readuntil_timeout(self.reader, self.separator, 60)
        if data in [READ_TIMEOUT, SOCKET_CLOSED]:
            return False

        assert isinstance(data, bytes)

        login_password = data.decode()
        if login_username == self.username and login_password == self.password:
            self.writer.write(b"OK" + self.separator)
            await self.writer.drain()
        else:
            self.writer.write(b"Authentification failed." + self.separator)
            await self.writer.drain()
            return False

        if self.license_error:
            self.writer.write(b"$err:(license?)" + self.separator)
            await self.writer.drain()
            return False

        return True

    async def main_loop(self) -> None:
        """Query the socket."""
        assert self.reader is not None
        while True:
            # Read data from socket
            data = await readuntil_timeout(self.reader, self.separator, 1.0)
            if data == READ_TIMEOUT:
                continue
            if data == SOCKET_CLOSED:
                break
            assert isinstance(data, bytes)
            await self.process_data(data)

    async def process_data(self, data: bytes) -> None:
        """Process incoming data."""
        assert self.writer is not None
        self.data_received.append(data)
        if data == b"!CHD":
            self.writer.write(b"(dec-mode)" + self.separator)
            await self.writer.drain()

    async def send_message(self, message: str) -> None:
        """Send the given message to the socket."""
        assert self.writer is not None
        self.writer.write(message.encode() + self.separator)
        await self.writer.drain()

    async def received(
        self, message: bytes | str, timeout: int = 5, remove: bool = True
    ) -> bool:
        """Return if given message was received."""
        assert self.writer is not None

        async def receive_loop(data: bytes, remove: bool) -> None:
            while data not in self.data_received:
                await asyncio.sleep(0.05)
            if remove:
                self.data_received.remove(data)

        if isinstance(message, str):
            data = message.encode()
        else:
            data = message

        try:
            await asyncio.wait_for(receive_loop(data, remove), timeout=timeout)
            return True
        except asyncio.TimeoutError:
            return False