File: test_websocket.py

package info (click to toggle)
aiorpcx 0.25.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 560 kB
  • sloc: python: 6,647; makefile: 18
file content (43 lines) | stat: -rwxr-xr-x 1,481 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import pytest

from aiorpcx import connect_ws, NetAddress, serve_ws

from test_session import MyServerSession


@pytest.fixture(scope="function")
async def ws_server(unused_tcp_port):
    server = await serve_ws(MyServerSession, 'localhost', unused_tcp_port)
    yield f'ws://localhost:{unused_tcp_port}'
    server.close()
    await server.wait_closed()


@pytest.mark.filterwarnings("ignore:'with .*:DeprecationWarning")
class TestWSTransport:

    @pytest.mark.asyncio
    async def test_send_request(self, ws_server):
        async with connect_ws(ws_server) as session:
            assert await session.send_request('echo', [23]) == 23

    @pytest.mark.asyncio
    async def test_basics(self, ws_server):
        async with connect_ws(ws_server) as session:
            assert session.proxy() is None
            remote_address = session.remote_address()
            assert isinstance(remote_address, NetAddress)
            assert str(remote_address.host) in ('localhost', '::1', '127.0.0.1')
            assert ws_server.endswith(str(remote_address.port))

    @pytest.mark.asyncio
    async def test_is_closing(self, ws_server):
        async with connect_ws(ws_server) as session:
            assert not session.is_closing()
            await session.close()
            assert session.is_closing()

        async with connect_ws(ws_server) as session:
            assert not session.is_closing()
            await session.abort()
            assert session.is_closing()