File: test_startup_timeout.py

package info (click to toggle)
python-pytest-xprocess 0.22.2-0.1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 288 kB
  • sloc: python: 698; makefile: 34; sh: 10
file content (40 lines) | stat: -rw-r--r-- 1,027 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import socket
import sys
import time
from pathlib import Path

import pytest

from xprocess import ProcessStarter

server_path = Path(__file__).parent.joinpath("server.py").absolute()


def cleanup_server_instance(tcp_port):
    sock = socket.socket()
    sock.connect(("localhost", tcp_port))
    try:
        for _ in range(10):
            sock.sendall(b"exit\n")
            sock.recv(1)
            time.sleep(0.1)
    except (
        BrokenPipeError,
        ConnectionAbortedError,
        ConnectionResetError,
    ):  # Server is terminated
        pass
    sock.close()


@pytest.mark.parametrize("proc_name", ["s1", "s2", "s3"])
def test_timeout_raise_exception(tcp_port, proc_name, xprocess, request):
    class Starter(ProcessStarter):
        timeout = 2
        max_read_lines = 500
        pattern = "will not match"
        args = [sys.executable, server_path, tcp_port, "--no-children"]

    with pytest.raises(TimeoutError):
        xprocess.ensure(proc_name, Starter)
    cleanup_server_instance(tcp_port)