File: test_fork.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (136 lines) | stat: -rw-r--r-- 4,542 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from pypy.module.thread.test.support import GenericTestThread

class AppTestFork(GenericTestThread):
    spaceconfig = dict(usemodules=GenericTestThread.spaceconfig['usemodules'] + ('imp',))

    def test_fork_with_thread(self):
        # XXX This test depends on a multicore machine, as busy_thread must
        # aquire the GIL the instant that the main thread releases it.
        # It will incorrectly pass if the GIL is not grabbed in time.
        import _thread
        import os
        import time

        if not hasattr(os, 'fork'):
            skip("No fork on this platform")
        if not self.runappdirect:
            skip("Not reliable before translation")

        def busy_thread():
            print('sleep')
            while run:
                time.sleep(0)
            done.append(None)

        for i in range(150):
            run = True
            done = []
            try:
                print('sleep')
                _thread.start_new(busy_thread, ())

                pid = os.fork()
                if pid == 0:
                    os._exit(0)
                else:
                    self.timeout_killer(pid, 10)
                    exitcode = os.waitpid(pid, 0)[1]
                    assert exitcode == 0 # if 9, process was killed by timer!
            finally:
                run = False
                self.waitfor(lambda: done)
                assert done

    def test_forked_can_thread(self):
        "Checks that a forked interpreter can start a thread"
        import _thread
        import os

        if not hasattr(os, 'fork'):
            skip("No fork on this platform")

        for i in range(10):
            # pre-allocate some locks
            _thread.start_new_thread(lambda: None, ())
            print('sleep')

            pid = os.fork()
            if pid == 0:
                _thread.start_new_thread(lambda: None, ())
                os._exit(0)
            else:
                self.timeout_killer(pid, 10)
                exitcode = os.waitpid(pid, 0)[1]
                assert exitcode == 0 # if 9, process was killed by timer!

    def test_forked_is_main_thread(self):
        "Checks that a forked interpreter is the main thread"
        import os, _thread, signal

        if not hasattr(os, 'fork'):
            skip("No fork on this platform")

        def threadfunction():
            pid = os.fork()
            if pid == 0:
                print('in child')
                # signal() only works from the 'main' thread
                signal.signal(signal.SIGUSR1, signal.SIG_IGN)
                os._exit(42)
            else:
                self.timeout_killer(pid, 10)
                exitcode = os.waitpid(pid, 0)[1]
                feedback.append(exitcode)

        feedback = []
        _thread.start_new_thread(threadfunction, ())
        self.waitfor(lambda: feedback)
        # if 0, an (unraisable) exception was raised from the forked thread.
        # if 9, process was killed by timer.
        # if 42<<8, os._exit(42) was correctly reached.
        assert feedback == [42<<8]

    def test_nested_import_lock_fork(self):
        """Check fork() in main thread works while the main thread is doing an import"""
        # Issue 9573: this used to trigger RuntimeError in the child process
        import imp
        import os
        import time

        if not hasattr(os, 'fork'):
            skip("No fork on this platform")

        def fork_with_import_lock(level):
            release = 0
            in_child = False
            try:
                try:
                    for i in range(level):
                        imp.acquire_lock()
                        release += 1
                    pid = os.fork()
                    in_child = not pid
                finally:
                    for i in range(release):
                        imp.release_lock()
            except RuntimeError:
                if in_child:
                    if verbose > 1:
                        print("RuntimeError in child")
                    os._exit(1)
                raise
            if in_child:
                os._exit(0)

            for i in range(10):
                spid, status = os.waitpid(pid, os.WNOHANG)
                if spid == pid:
                    break
                time.sleep(1.0)
            assert spid == pid
            assert status == 0

        # Check this works with various levels of nested
        # import in the main thread
        for level in range(5):
            fork_with_import_lock(level)