File: test_posix_extra.py

package info (click to toggle)
pypy3 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 111,848 kB
  • sloc: python: 1,291,746; ansic: 74,281; asm: 5,187; cpp: 3,017; sh: 2,533; makefile: 544; xml: 243; lisp: 45; csh: 21; awk: 4
file content (40 lines) | stat: -rw-r--r-- 797 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import py
import sys, os, subprocess


CODE = """
import sys, os, thread, time

fd1, fd2 = os.pipe()
f1 = os.fdopen(fd1, 'r', 0)
f2 = os.fdopen(fd2, 'w', 0)

def f():
    print "thread started"
    x = f1.read(1)
    assert x == "X"
    print "thread exit"

thread.start_new_thread(f, ())
time.sleep(0.5)
if os.fork() == 0:   # in the child
    time.sleep(0.5)
    x = f1.read(1)
    assert x == "Y"
    print "ok!"
    sys.exit()

f2.write("X")   # in the parent
f2.write("Y")   # in the parent
time.sleep(1.0)
"""


def test_thread_fork_file_lock():
    if not hasattr(os, 'fork'):
        py.test.skip("requires 'fork'")
    output = subprocess.check_output([sys.executable, '-u', '-c', CODE])
    assert output.splitlines() == [
        'thread started',
        'thread exit',
        'ok!']