1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
import signal
import socketserver
import sys
from multiprocessing import Process
from time import sleep
class TestHandler(socketserver.StreamRequestHandler):
"""The request handler class for the test server."""
def handle(self):
while True:
line = self.rfile.readline()
if not line:
break
if line == bytes("exit\n", "utf-8"):
# terminate itself
self.server._BaseServer__shutdown_request = True
else:
response = line.upper()
self.request.sendall(response)
class TestServer(socketserver.TCPServer):
"""This server class is used for testing only"""
allow_reuse_address = True
def write_test_patterns(self):
self.write_blank_lines(100)
self.write_complex_strings(10)
self.write_non_ascii(8)
sys.stderr.write("started\n") # 18 line
self.write_long_output(100)
self.write_garbage_bytes(10)
sys.stderr.write("finally started\n")
sys.stderr.flush()
def write_long_output(self, n):
"""write several lines to test pattern matching
with process with a lot of output"""
for _ in range(n):
sys.stderr.write("spam, bacon, eggs\n")
def write_non_ascii(self, n):
"""non-ascii characters must be supported"""
for _ in range(n):
sys.stderr.write("Ê�æ�pP��çîöē�P��adåráøū\n")
def write_garbage_bytes(self, n):
"""non-mapable bytes characters must not break xprocess"""
for _ in range(n):
sys.stderr.buffer.write(b"\x90\x81\x91")
sys.stderr.write("\n")
def write_complex_strings(self, n):
"""Special/control characters should not cause problems"""
for i in range(n):
sys.stderr.write(f"{i} , % /.%,@%@._%%# #/%/ %\n")
def write_blank_lines(self, n):
"""Blank lines should be igored by xprocess"""
for _ in range(n):
sys.stderr.write("\n")
def fork_children(self, target, amount):
"""forks multiple children for testing process tree termination"""
for _ in range(amount):
p = Process(target=target)
p.start()
if __name__ == "__main__":
def do_nothing():
while True:
sleep(1)
HOST, PORT = "localhost", int(sys.argv[1])
server = TestServer((HOST, PORT), TestHandler)
if "--ignore-sigterm" in sys.argv and sys.platform != "win32":
# ignore sigterm for testing XProcessInfo.terminate
# when processes fail to exit
signal.signal(signal.SIGTERM, signal.SIG_IGN)
if "--no-children" not in sys.argv:
server.fork_children(do_nothing, 3)
server.write_test_patterns()
server.serve_forever()
|