File: test_server.py

package info (click to toggle)
python-asgiref 3.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 428 kB
  • sloc: python: 2,635; makefile: 19
file content (145 lines) | stat: -rw-r--r-- 4,374 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import asyncio
import socket as sock

import pytest
import pytest_asyncio

from asgiref.server import StatelessServer


async def sock_recvfrom(sock, n):
    while True:
        try:
            return sock.recvfrom(n)
        except BlockingIOError:
            await asyncio.sleep(0)


class Server(StatelessServer):
    def __init__(self, application, max_applications=1000):
        super().__init__(
            application,
            max_applications=max_applications,
        )
        self._sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
        self._sock.setblocking(False)
        self._sock.bind(("127.0.0.1", 0))

    @property
    def address(self):
        return self._sock.getsockname()

    async def handle(self):
        while True:
            data, addr = await sock_recvfrom(self._sock, 4096)
            data = data.decode("utf-8")

            if data.startswith("Register"):
                _, usr_name = data.split(" ")
                input_quene = self.get_or_create_application_instance(usr_name, addr)
                input_quene.put_nowait(b"Welcome")

            elif data.startswith("To"):
                _, usr_name, msg = data.split(" ", 2)
                input_quene = self.get_or_create_application_instance(usr_name, addr)
                input_quene.put_nowait(msg.encode("utf-8"))

    async def application_send(self, scope, message):
        self._sock.sendto(message, scope)

    def close(self):
        self._sock.close()
        for details in self.application_instances.values():
            details["future"].cancel()


class Client:
    def __init__(self, name):
        self._sock = sock.socket(sock.AF_INET, sock.SOCK_DGRAM)
        self._sock.setblocking(False)
        self.name = name

    async def register(self, server_addr, name=None):
        name = name or self.name
        self._sock.sendto(f"Register {name}".encode(), server_addr)

    async def send(self, server_addr, to, msg):
        self._sock.sendto(f"To {to} {msg}".encode(), server_addr)

    async def get_msg(self):
        msg, server_addr = await sock_recvfrom(self._sock, 4096)
        return msg, server_addr

    def close(self):
        self._sock.close()


@pytest_asyncio.fixture(scope="function")
async def server():
    async def app(scope, receive, send):
        while True:
            msg = await receive()
            await send(msg)

    server = Server(app, 10)
    yield server
    server.close()


async def check_client_msg(client, expected_address, expected_msg):
    msg, server_addr = await asyncio.wait_for(client.get_msg(), timeout=1.0)
    assert msg == expected_msg
    assert server_addr == expected_address


@pytest.mark.asyncio
async def test_stateless_server(server):
    """StatelessServer can be instantiated with an ASGI 3 application."""
    """Create a UDP Server can register instance based on name from message of client.
    Clients can communicate to other client by name through server"""

    client1 = Client(name="client1")
    client2 = Client(name="client2")

    async def check_client1_behavior():
        await client1.register(server.address)
        await check_client_msg(client1, server.address, b"Welcome")
        await client1.send(server.address, "client2", "Hello")

    async def check_client2_behavior():
        await client2.register(server.address)
        await check_client_msg(client2, server.address, b"Welcome")
        await check_client_msg(client2, server.address, b"Hello")

    class Done(Exception):
        pass

    async def do_test():
        await asyncio.gather(check_client1_behavior(), check_client2_behavior())
        raise Done

    try:
        await asyncio.gather(server.arun(), do_test())
    except Done:
        pass


@pytest.mark.asyncio
async def test_server_delete_instance(server):
    """The max_applications of Server is 10. After 20 times register, application number should be 10."""
    client1 = Client(name="client1")

    class Done(Exception):
        pass

    async def client1_multiple_register():
        for i in range(20):
            await client1.register(server.address, name=f"client{i}")
            print(f"client{i}")
            await check_client_msg(client1, server.address, b"Welcome")
        raise Done

    try:
        await asyncio.gather(client1_multiple_register(), server.arun())
    except Done:
        pass