File: test_pysignalr.py

package info (click to toggle)
pysignalr 1.3.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 468 kB
  • sloc: python: 1,192; cs: 121; xml: 37; makefile: 28
file content (320 lines) | stat: -rw-r--r-- 11,441 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import asyncio
import atexit
import logging
import time
from contextlib import suppress
from pathlib import Path
from typing import Any
from typing import cast

import _pytest.outcomes
import pytest
import requests
from docker.client import DockerClient  # type: ignore[import-untyped]

from pysignalr.client import SignalRClient
from pysignalr.exceptions import AuthorizationError
from pysignalr.exceptions import ServerError

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')


def get_docker_client() -> 'DockerClient':
    """Get Docker client instance if socket is available; skip test otherwise."""

    docker_socks = (
        Path('/var/run/docker.sock'),
        Path.home() / 'Library' / 'Containers' / 'com.docker.docker' / 'Data' / 'vms' / '0' / 'docker.sock',
        Path.home() / 'Library' / 'Containers' / 'com.docker.docker' / 'Data' / 'docker.sock',
    )
    for path in docker_socks:
        if path.exists():
            return DockerClient(base_url=f'unix://{path}')

    raise _pytest.outcomes.Skipped(  # pragma: no cover
        'Docker socket not found',
        allow_module_level=True,
    )


@pytest.fixture(scope='module')
async def aspnet_server() -> str:
    """Run dummy ASPNet server container (destroyed on exit) and return its IP."""
    docker = get_docker_client()

    logging.info('Building ASPNet server image (this may take a while)')
    docker.images.build(
        path=Path(__file__).parent.parent.parent.joinpath('AspNetAuthExample').as_posix(),
        tag='aspnet_server',
    )

    logging.info('Starting ASPNet server container')
    container = docker.containers.run(
        image='aspnet_server',
        environment={
            'ASPNETCORE_ENVIRONMENT': 'Development',
            'ASPNETCORE_URLS': 'http://+:80',
        },
        detach=True,
        remove=True,
    )
    atexit.register(container.stop)
    container.reload()
    ip = cast('str', container.attrs['NetworkSettings']['IPAddress'])

    logging.info('Waiting for server to start')
    wait_for_server(f'http://{ip}/api/auth/login')

    return ip


def wait_for_server(url: str, timeout: int = 30) -> None:
    """
    Waits for the server to be ready.

    Args:
        url (str): The URL to check the server status.
        timeout (int): The maximum time to wait for the server to be ready.
    """
    start = time.time()
    while True:
        try:
            response = requests.post(url, json={'username': 'test', 'password': 'password'}, timeout=10)
            if response.status_code in [200, 401, 403]:
                logging.info('Server is up and running at %s', url)
                break
        except requests.exceptions.RequestException as e:
            logging.info('Waiting for server: %s', e)
        if time.time() - start > timeout:
            raise TimeoutError('Server did not start in time')
        time.sleep(2)


class TestPysignalr:
    async def test_connection(self, aspnet_server: str) -> None:
        """
        Tests connection to the SignalR server.
        """
        url = f'http://{aspnet_server}/weatherHub'
        logging.info('Testing connection to %s', url)
        client = SignalRClient(url)

        task = asyncio.create_task(client.run())

        async def _on_open() -> None:
            logging.info('Connection opened, cancelling task')
            task.cancel()

        client.on_open(_on_open)

        with suppress(asyncio.CancelledError):
            await task

    async def test_connection_with_token(self, aspnet_server: str) -> None:
        """
        Tests connection to the SignalR server with a valid token.
        """
        login_url = f'http://{aspnet_server}/api/auth/login'
        logging.info('Attempting to log in at %s', login_url)
        login_data = {'username': 'test', 'password': 'password'}
        response = requests.post(login_url, json=login_data, timeout=10)
        token = response.json().get('token')
        if not token:
            pytest.fail('Failed to obtain token from login response')

        url = f'http://{aspnet_server}/weatherHub'
        logging.info('Testing connection with token to %s', url)

        def token_factory() -> str:
            return cast('str', token)

        client = SignalRClient(
            url=url,
            access_token_factory=token_factory,
            headers={'mycustomheader': 'mycustomheadervalue'},
        )

        task = asyncio.create_task(client.run())

        async def _on_open() -> None:
            logging.info('Connection with token opened, cancelling task')
            task.cancel()

        client.on_open(_on_open)

        with suppress(asyncio.CancelledError):
            await task

        # Verify the token in the connection headers
        assert 'Authorization' in client._transport._headers
        assert client._transport._headers['Authorization'] == f'Bearer {token}'

    async def test_invalid_token(self, aspnet_server: str) -> None:
        """
        Tests connection to the SignalR server with an invalid token.
        """
        url = f'http://{aspnet_server}/weatherHub'
        logging.info('Testing connection with invalid token to %s', url)

        def invalid_token_factory() -> str:
            return 'invalid_token'  # Simulate an invalid token

        client = SignalRClient(
            url=url,
            access_token_factory=invalid_token_factory,
            headers={'mycustomheader': 'mycustomheadervalue'},
        )

        task = asyncio.create_task(client.run())

        async def _on_open() -> None:
            logging.info('Connection with invalid token opened, cancelling task')
            task.cancel()

        client.on_open(_on_open)

        with suppress(asyncio.CancelledError):
            try:
                await task
            except AuthorizationError:
                logging.info('AuthorizationError caught as expected')
                pass

        # Verify if the AuthorizationError was raised correctly
        assert task.cancelled() is True

    async def test_send_and_receive_message(self, aspnet_server: str) -> None:
        """
        Tests sending and receiving a message with the SignalR server.
        """
        login_url = f'http://{aspnet_server}/api/auth/login'
        logging.info('Attempting to log in at %s', login_url)
        login_data = {'username': 'test', 'password': 'password'}
        response = requests.post(login_url, json=login_data, timeout=10)
        token = response.json().get('token')
        if not token:
            logging.error('Failed to obtain token from login response')
            raise AssertionError('Failed to obtain token from login response')
        logging.info('Obtained token: %s', token)

        url = f'http://{aspnet_server}/weatherHub'
        logging.info('Testing send and receive message with token to %s', url)

        def token_factory() -> str:
            return cast('str', token)

        client = SignalRClient(
            url=url,
            access_token_factory=token_factory,
            headers={'mycustomheader': 'mycustomheadervalue'},
        )

        received_messages = []

        async def on_message_received(arguments: Any) -> None:
            user, message = arguments
            logging.info('Message received from %s: %s', user, message)
            received_messages.append((user, message))
            if len(received_messages) >= 1:
                task.cancel()

        client.on('ReceiveMessage', on_message_received)

        task = asyncio.create_task(client.run())

        async def _on_open() -> None:
            logging.info('Connection with token opened, sending message')
            await client.send('SendMessage', ['testuser', 'Hello, World!'])  # type: ignore[list-item]

        client.on_open(_on_open)

        try:
            with suppress(asyncio.CancelledError):
                await task
        except ServerError as e:
            logging.error('Server error: %s', e)
            raise

        # Verify if the message was received correctly
        assert received_messages, 'No messages were received'
        assert received_messages[0] == (
            'testuser',
            'Hello, World!',
        ), f'Unexpected message received: {received_messages[0]}'

        # Log detailed messages received
        for user, message in received_messages:
            logging.info('Detailed Log: Message from %s - %s', user, message)

    async def test_result_from_client(self, aspnet_server: str) -> None:
        """
        Tests send result from client when SignalR server use InvokeAsync method.
        """
        login_url = f'http://{aspnet_server}/api/auth/login'
        logging.info('Attempting to log in at %s', login_url)
        login_data = {'username': 'test', 'password': 'password'}
        response = requests.post(login_url, json=login_data, timeout=10)
        token = response.json().get('token')
        if not token:
            logging.error('Failed to obtain token from login response')
            raise AssertionError('Failed to obtain token from login response')
        logging.info('Obtained token: %s', token)

        url = f'http://{aspnet_server}/weatherHub'
        logging.info('Testing reply when receive InvokeAsync message with token to %s', url)

        def token_factory() -> str:
            return cast('str', token)

        client = SignalRClient(
            url=url,
            access_token_factory=token_factory,
            headers={'mycustomheader': 'mycustomheadervalue'},
        )

        received_messages = []

        async def on_result_require(arguments: Any) -> str:
            argument = arguments[0]
            logging.info('Message to reply received: %s', argument)
            return 'Reply message'

        async def on_message_received(arguments: Any) -> None:
            user, message = arguments
            logging.info('Server received the reply and now send a message from %s: %s', user, message)
            received_messages.append((user, message))
            if len(received_messages) >= 1:
                task.cancel()

        client.on('ResultRequired', on_result_require)
        client.on('SuccessReceivedMessage', on_message_received)

        task = asyncio.create_task(client.run())

        async def _on_open() -> None:
            logging.info('Connection with token opened, sending message to trigger invoke async method')
            await client.send('TriggerResultRequired', ['testuser', 'Hello, World!'])  # type: ignore[list-item]

        client.on_open(_on_open)

        try:
            with suppress(asyncio.CancelledError):
                await asyncio.wait_for(task, timeout=30)  # Set a timeout for the task
        except ServerError as e:
            logging.error('Server error: %s', e)
            raise
        except asyncio.TimeoutError:
            logging.error('Test timed out')
            task.cancel()
            await task

        # Verify if the message was received correctly
        assert received_messages, 'No messages were received'
        assert received_messages[0] == (
            'testuser',
            'Hello, World!',
        ), f'Unexpected message received: {received_messages[0]}'

        # Log detailed messages received
        for user, message in received_messages:
            logging.info('Detailed Log: Message from %s - %s', user, message)