File: fork_in_thread.py

package info (click to toggle)
python-eventlet 0.40.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,200 kB
  • sloc: python: 25,101; sh: 78; makefile: 31
file content (49 lines) | stat: -rw-r--r-- 874 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import eventlet

eventlet.monkey_patch()

import os
import time
import threading

results = []
parent = True


def check_current():
    if threading.current_thread() not in threading.enumerate():
        raise SystemExit(17)


def background():
    time.sleep(1)
    check_current()
    results.append(True)


def forker():
    pid = os.fork()
    check_current()
    if pid != 0:
        # We're in the parent. Wait for child to die.
        wait_pid, status = os.wait()
        exit_code = os.waitstatus_to_exitcode(status)
        assert wait_pid == pid
        assert exit_code == 0, exit_code
    else:
        global parent
        parent = False
    results.append(True)


t = threading.Thread(target=background)
t.start()
t2 = threading.Thread(target=forker)
t2.start()
t2.join()
t.join()

check_current()
assert results == [True, True]
if parent:
    print("pass")