File: test_regr1.py

package info (click to toggle)
uvloop 0.21.0%2Bds1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 10,308 kB
  • sloc: python: 8,273; ansic: 108; makefile: 42
file content (122 lines) | stat: -rw-r--r-- 3,403 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import queue
import multiprocessing
import signal
import threading
import unittest

import uvloop

from uvloop import _testbase as tb

class EchoServerProtocol(asyncio.Protocol):

    def connection_made(self, transport):
        transport.write(b'z')


class EchoClientProtocol(asyncio.Protocol):

    def __init__(self, loop):
        self.loop = loop

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        self.transport.close()

    def connection_lost(self, exc):
        self.loop.stop()


class FailedTestError(BaseException):
    pass


def run_server(quin, qout):
    server_loop = None

    def server_thread():
        nonlocal server_loop
        loop = server_loop = uvloop.new_event_loop()
        asyncio.set_event_loop(loop)
        if not tb.has_IPv4:
            local_addr = '::1'
        else:
            local_addr = '127.0.0.1'
        coro = loop.create_server(EchoServerProtocol, local_addr, 0)
        server = loop.run_until_complete(coro)
        addr = server.sockets[0].getsockname()
        qout.put(addr)
        loop.run_forever()
        server.close()
        loop.run_until_complete(server.wait_closed())
        try:
            loop.close()
        except Exception as exc:
            print(exc)
        qout.put('stopped')

    thread = threading.Thread(target=server_thread, daemon=True)
    thread.start()

    quin.get()
    server_loop.call_soon_threadsafe(server_loop.stop)
    thread.join(1)


class TestIssue39Regr(tb.UVTestCase):
    """See https://github.com/MagicStack/uvloop/issues/39 for details.

    Original code to reproduce the bug is by Jim Fulton.
    """

    def on_alarm(self, sig, fr):
        if self.running:
            raise FailedTestError

    def run_test(self):
        for i in range(10):
            for threaded in [True, False]:
                if threaded:
                    qin, qout = queue.Queue(), queue.Queue()
                    threading.Thread(
                        target=run_server,
                        args=(qin, qout),
                        daemon=True).start()
                else:
                    qin = multiprocessing.Queue()
                    qout = multiprocessing.Queue()
                    multiprocessing.Process(
                        target=run_server,
                        args=(qin, qout),
                        daemon=True).start()

                addr = qout.get()
                loop = self.new_loop()
                asyncio.set_event_loop(loop)
                loop.create_task(
                    loop.create_connection(
                        lambda: EchoClientProtocol(loop),
                        host=addr[0], port=addr[1]))
                loop.run_forever()
                loop.close()
                qin.put('stop')
                qout.get()

    @unittest.skipIf(
        multiprocessing.get_start_method(False) == 'spawn',
        'no need to test on macOS where spawn is used instead of fork')
    def test_issue39_regression(self):
        signal.signal(signal.SIGALRM, self.on_alarm)
        signal.alarm(5)

        try:
            self.running = True
            self.run_test()
        except FailedTestError:
            self.fail('deadlocked in libuv')
        finally:
            self.running = False
            signal.signal(signal.SIGALRM, signal.SIG_IGN)