File: test_unixsocket.py

package info (click to toggle)
aiorpcx 0.25.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 560 kB
  • sloc: python: 6,647; makefile: 18
file content (47 lines) | stat: -rwxr-xr-x 1,494 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import sys
import asyncio
import pytest
import tempfile
from os import path
from aiorpcx import connect_us, serve_us
from test_session import MyServerSession

if sys.platform.startswith("win"):
    pytest.skip("skipping tests not compatible with Windows platform", allow_module_level=True)


@pytest.fixture
def us_server(event_loop):
    with tempfile.TemporaryDirectory() as tmp_folder:
        socket_path = path.join(tmp_folder, 'test.socket')
        coro = serve_us(MyServerSession, socket_path, loop=event_loop)
        server = event_loop.run_until_complete(coro)
        yield socket_path
        tasks = asyncio.all_tasks(event_loop)

        async def close_all():
            server.close()
            await server.wait_closed()
            if tasks:
                await asyncio.wait(tasks)
        event_loop.run_until_complete(close_all())


class TestUSTransport:

    @pytest.mark.asyncio
    async def test_send_request(self, us_server):
        async with connect_us(us_server) as session:
            assert await session.send_request('echo', [23]) == 23

    @pytest.mark.asyncio
    async def test_is_closing(self, us_server):
        async with connect_us(us_server) as session:
            assert not session.is_closing()
            await session.close()
            assert session.is_closing()

        async with connect_us(us_server) as session:
            assert not session.is_closing()
            await session.abort()
            assert session.is_closing()