File: test_process_spawning.py

package info (click to toggle)
uvloop 0.17.0%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 8,308 kB
  • sloc: python: 8,139; ansic: 91; makefile: 42
file content (110 lines) | stat: -rw-r--r-- 3,737 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import asyncio
import ctypes.util
import logging
from concurrent.futures import ThreadPoolExecutor
from threading import Thread
from unittest import TestCase

import uvloop


class ProcessSpawningTestCollection(TestCase):

    def test_spawning_external_process(self):
        """Test spawning external process (using `popen` system call) that
        cause loop freeze."""

        async def run(loop):
            event = asyncio.Event()

            dummy_workers = [simulate_loop_activity(loop, event)
                             for _ in range(5)]
            spawn_worker = spawn_external_process(loop, event)
            done, pending = await asyncio.wait([
                asyncio.ensure_future(fut)
                for fut in ([spawn_worker] + dummy_workers)
            ])
            exceptions = [result.exception()
                          for result in done if result.exception()]
            if exceptions:
                raise exceptions[0]

            return True

        async def simulate_loop_activity(loop, done_event):
            """Simulate loop activity by busy waiting for event."""
            while True:
                try:
                    await asyncio.wait_for(done_event.wait(), timeout=0.1)
                except asyncio.TimeoutError:
                    pass

                if done_event.is_set():
                    return None

        async def spawn_external_process(loop, event):
            executor = ThreadPoolExecutor()
            try:
                call = loop.run_in_executor(executor, spawn_process)
                await asyncio.wait_for(call, timeout=3600)
            finally:
                event.set()
                executor.shutdown(wait=False)
            return True

        BUFFER_LENGTH = 1025
        BufferType = ctypes.c_char * (BUFFER_LENGTH - 1)

        def run_echo(popen, fread, pclose):
            fd = popen('echo test'.encode('ASCII'), 'r'.encode('ASCII'))
            try:
                while True:
                    buffer = BufferType()
                    data = ctypes.c_void_p(ctypes.addressof(buffer))

                    # -> this call will freeze whole loop in case of bug
                    read = fread(data, 1, BUFFER_LENGTH, fd)
                    if not read:
                        break
            except Exception:
                logging.getLogger().exception('read error')
                raise
            finally:
                pclose(fd)

        def spawn_process():
            """Spawn external process via `popen` system call."""

            stdio = ctypes.CDLL(ctypes.util.find_library('c'))

            # popen system call
            popen = stdio.popen
            popen.argtypes = (ctypes.c_char_p, ctypes.c_char_p)
            popen.restype = ctypes.c_void_p

            # pclose system call
            pclose = stdio.pclose
            pclose.argtypes = (ctypes.c_void_p,)
            pclose.restype = ctypes.c_int

            # fread system call
            fread = stdio.fread
            fread.argtypes = (ctypes.c_void_p, ctypes.c_size_t,
                              ctypes.c_size_t, ctypes.c_void_p)
            fread.restype = ctypes.c_size_t

            for iteration in range(1000):
                t = Thread(target=run_echo,
                           args=(popen, fread, pclose),
                           daemon=True)
                t.start()
                t.join(timeout=10.0)
                if t.is_alive():
                    raise Exception('process freeze detected at {}'
                                    .format(iteration))

            return True

        loop = uvloop.new_event_loop()
        proc = loop.run_until_complete(run(loop))
        self.assertTrue(proc)