File: test_deadlock.py

package info (click to toggle)
easyprocess 1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 240 kB
  • sloc: python: 774; sh: 25; makefile: 3
file content (77 lines) | stat: -rw-r--r-- 1,697 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
import sys
import threading
from time import sleep

import pytest
from pyvirtualdisplay.display import Display

from easyprocess import EasyProcess

python = sys.executable
# requirement:  apt install imagemagick

# deadlock
# popen.communicate() hangs
# no deadlock with temp_files

PROG = """
from PIL import Image
Image.new("RGB",(99, 99)).show()
"""

EASYPROCESS_USE_TEMP_FILES = os.environ.get("EASYPROCESS_USE_TEMP_FILES")


def test_dummy():
    pass


# skip these tests for Windows/Mac
# and when 'use_temp_files' is forced by env variable
if sys.platform.startswith("linux") and not EASYPROCESS_USE_TEMP_FILES:

    def test_has_imagemagick():
        assert EasyProcess(["display", "-version"]).call().return_code == 0

    @pytest.mark.timeout(120)
    def test_deadlock_temp_files():
        with Display():
            p = EasyProcess(
                [
                    python,
                    "-c",
                    PROG,
                ],
                use_temp_files=True,
            )
            p.start()
            sleep(2)
            # hangs with pipes
            p.stop()

    @pytest.mark.timeout(120)
    def test_deadlock_pipe():
        with Display():
            p = EasyProcess(
                [
                    python,
                    "-c",
                    PROG,
                ],
                use_temp_files=False,
            )
            p.start()
            sleep(2)

            def start():
                # hangs with pipes
                p.stop()

            thread = threading.Thread(target=start)
            thread.start()

            sleep(6)
            assert thread.is_alive()

        thread.join()