File: connection_tests.py

package info (click to toggle)
aioprocessing 2.0.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 228 kB
  • sloc: python: 1,463; sh: 13; makefile: 9
file content (124 lines) | stat: -rw-r--r-- 3,793 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import unittest
from array import array

import aioprocessing
import aioprocessing.mp as multiprocessing
from aioprocessing.connection import AioConnection, AioListener, AioClient
from aioprocessing.mp import Process

from ._base_test import BaseTest


def conn_send(conn, val):
    conn.send(val)


def client_sendback(event, address, authkey):
    event.wait()
    conn = multiprocessing.connection.Client(address, authkey=authkey)
    got = conn.recv()
    conn.send(got + got)
    conn.close()


def listener_sendback(event, address, authkey):
    listener = multiprocessing.connection.Listener(address, authkey=authkey)
    event.set()
    conn = listener.accept()
    inval = conn.recv()
    conn.send_bytes(array("i", [inval, inval + 1, inval + 2, inval + 3]))
    conn.close()


class PipeTest(BaseTest):
    def test_pipe(self):
        conn1, conn2 = aioprocessing.AioPipe()
        val = 25
        p = Process(target=conn_send, args=(conn1, val))
        p.start()

        async def conn_recv():
            out = await conn2.coro_recv()
            self.assertEqual(out, val)

        self.loop.run_until_complete(conn_recv())


class ListenerTest(BaseTest):
    def test_listener(self):
        address = ("localhost", 8999)
        authkey = b"abcdefg"
        event = multiprocessing.Event()
        p = Process(target=client_sendback, args=(event, address, authkey))
        p.start()
        listener = AioListener(address, authkey=authkey)
        try:
            event.set()
            conn = listener.accept()
            self.assertIsInstance(conn, AioConnection)
            conn.send("")
            conn.close()
            event.clear()
            p.join()
            p = Process(target=client_sendback, args=(event, address, authkey))
            p.start()

            def conn_accept():
                fut = listener.coro_accept()
                event.set()
                conn = yield from fut
                self.assertIsInstance(conn, AioConnection)
                yield from conn.coro_send("hi there")
                back = yield from conn.coro_recv()
                self.assertEqual(back, "hi therehi there")
                conn.close()

            self.loop.run_until_complete(conn_accept())
            p.join()
        finally:
            listener.close()

    def test_client(self):
        address = ("localhost", 8999)
        authkey = b"abcdefg"
        event = multiprocessing.Event()
        p = Process(target=listener_sendback, args=(event, address, authkey))
        p.start()
        event.wait()
        conn = AioClient(address, authkey=authkey)
        self.assertIsInstance(conn, AioConnection)

        def do_work():
            yield from conn.coro_send(25)
            arr = array("i", [0, 0, 0, 0])
            yield from conn.coro_recv_bytes_into(arr)
            self.assertEqual(arr, array("i", [25, 26, 27, 28]))
            conn.close()

        self.loop.run_until_complete(do_work())
        p.join()

    def test_listener_ctxmgr(self):
        address = ("localhost", 8999)
        authkey = b"abcdefg"
        with AioListener(address, authkey=authkey) as listener:
            self.assertIsInstance(listener, AioListener)
        self.assertRaises(OSError, listener.accept)

    def test_client_ctxmgr(self):
        address = ("localhost", 8999)
        authkey = b"abcdefg"
        event = multiprocessing.Event()
        p = Process(target=listener_sendback, args=(event, address, authkey))
        p.daemon = True
        p.start()
        event.wait()
        with AioClient(address, authkey=authkey) as conn:
            self.assertIsInstance(conn, AioConnection)
        self.assertRaises(OSError, conn.send, "hi")
        p.terminate()
        p.join()


if __name__ == "__main__":
    unittest.main()