File: test_posix_extra.py

package info (click to toggle)
pypy 7.3.3%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 113,660 kB
  • sloc: python: 1,419,707; ansic: 64,313; cpp: 3,290; sh: 2,763; makefile: 540; xml: 256; asm: 213; lisp: 45; awk: 4
file content (39 lines) | stat: -rw-r--r-- 782 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import pytest
import sys, os, subprocess


CODE = """
import sys, os, thread, time

fd1, fd2 = os.pipe()
f1 = os.fdopen(fd1, 'r', 0)
f2 = os.fdopen(fd2, 'w', 0)

def f():
    print "thread started"
    x = f1.read(1)
    assert x == "X"
    print "thread exit"

thread.start_new_thread(f, ())
time.sleep(0.5)
if os.fork() == 0:   # in the child
    time.sleep(0.5)
    x = f1.read(1)
    assert x == "Y"
    print "ok!"
    sys.exit()

f2.write("X")   # in the parent
f2.write("Y")   # in the parent
time.sleep(1.0)
"""


@pytest.mark.skipif("not getattr(os, 'fork', None)")
def test_thread_fork_file_lock():
    output = subprocess.check_output([sys.executable, '-u', '-c', CODE])
    assert output.splitlines() == [
        'thread started',
        'thread exit',
        'ok!']