1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
|
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.
"""A watchdog process for debuggee processes spawned by tests.
Interacts with the main test runner process over stdio, and keeps track of running
debugpy processes. If the test runner process goes down, any debugpy test processes
are automatically killed.
"""
__all__ = ["start", "register_spawn", "unregister_spawn"]
import atexit
import os
import psutil
import subprocess
import sys
import threading
import time
from debugpy.common import log, messaging
from tests.watchdog import worker
WATCHDOG_TIMEOUT = 10
_name = f"watchdog-{os.getpid()}"
_stream = None
_process = None
_worker_log_filename = None
def start():
global _stream, _process, _worker_log_filename
if _stream is not None:
return
if sys.version_info >= (3, 13):
return
args = [sys.executable, worker.__file__, str(os.getpid())]
log.info(
"Spawning {0} for tests-{1}:\n\n{2}",
_name,
os.getpid(),
"\n".join(repr(s) for s in args),
)
_process = psutil.Popen(
args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE
)
_stream = messaging.JsonIOStream(_process.stdout, _process.stdin, _name)
event, _worker_log_filename = _stream.read_json()
assert event == "watchdog"
atexit.register(stop)
def _dump_worker_log(command, problem, exc_info=None):
reason = f"{_name}.{command}() {problem}"
if _worker_log_filename is None:
reason += ", but there is no log."
else:
try:
with open(_worker_log_filename) as f:
worker_log = f.read()
except Exception:
reason += f", but log {_worker_log_filename} could not be retrieved."
else:
reason += f"; watchdog worker process log:\n\n{worker_log}"
if exc_info is None:
log.error("{0}", reason)
else:
log.swallow_exception("{0}", reason, exc_info=exc_info)
return reason
def _invoke(command, *args):
def timeout():
time.sleep(WATCHDOG_TIMEOUT)
if timeout.occurred is None:
reason = _dump_worker_log(command, "timed out")
timeout.occurred = reason
timeout.occurred = None
timeout_thread = threading.Thread(target=timeout)
timeout_thread.daemon = True
timeout_thread.start()
try:
try:
_stream.write_json([command] + list(args))
response = _stream.read_json()
assert response == ["ok"], f"{_name} {response!r}"
finally:
timeout.occurred = False
except Exception:
_dump_worker_log(command, "failed", sys.exc_info())
raise
else:
assert not timeout.occurred, str(timeout.occurred)
def stop():
if _stream is None:
return
try:
_invoke("stop")
_stream.close()
except Exception:
log.swallow_exception()
def register_spawn(pid, name):
if sys.version_info >= (3, 13):
return
if _stream is None:
start()
_invoke("register_spawn", pid, name)
def unregister_spawn(pid, name):
if sys.version_info >= (3, 13):
return
assert _stream is not None
_invoke("unregister_spawn", pid, name)
|