1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
# mypy: allow-untyped-defs
try:
from importlib import reload
except ImportError:
pass
import json
import os
import queue
import tempfile
import threading
import pytest
from . import serve
from wptserve import logger
class ServerProcSpy(serve.ServerProc):
instances = None
def start(self, *args, **kwargs):
result = super().start(*args, **kwargs)
if ServerProcSpy.instances is not None:
ServerProcSpy.instances.put(self)
return result
serve.ServerProc = ServerProcSpy # type: ignore
@pytest.fixture()
def server_subprocesses():
ServerProcSpy.instances = queue.Queue()
yield ServerProcSpy.instances
ServerProcSpy.instances = None
@pytest.fixture()
def tempfile_name():
fd, name = tempfile.mkstemp()
yield name
os.close(fd)
os.remove(name)
def test_subprocess_exit(server_subprocesses, tempfile_name):
timeout = 30
def target():
# By default, the server initially creates a child process to validate
# local system configuration. That process is unrelated to the behavior
# under test, but at the time of this writing, the parent uses the same
# constructor that is also used to create the long-running processes
# which are relevant to this functionality. Disable the check so that
# the constructor is only used to create relevant processes.
config = {
"browser_host": "localhost",
"alternate_hosts": {"alt": "127.0.0.1"},
"check_subdomains": False,
}
with open(tempfile_name, "w") as handle:
json.dump(config, handle)
# The `logger` module from the wptserver package uses a singleton
# pattern which resists testing. In order to avoid conflicting with
# other tests which rely on that module, pre-existing state is
# discarded through an explicit "reload" operation.
reload(logger)
serve.run(config_path=tempfile_name)
thread = threading.Thread(target=target)
thread.start()
server_subprocesses.get(True, timeout)
subprocess = server_subprocesses.get(True, timeout)
subprocess.request_shutdown()
subprocess.wait()
thread.join(timeout)
assert not thread.is_alive()
|