File: __init__.py

package info (click to toggle)
debugpy 1.8.12%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,424 kB
  • sloc: python: 14,451; sh: 184; makefile: 33
file content (129 lines) | stat: -rw-r--r-- 3,328 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.

"""A watchdog process for debuggee processes spawned by tests.

Interacts with the main test runner process over stdio, and keeps track of running
debugpy processes. If the test runner process goes down, any debugpy test processes
are automatically killed.
"""

__all__ = ["start", "register_spawn", "unregister_spawn"]

import atexit
import os
import psutil
import subprocess
import sys
import threading
import time

from debugpy.common import log, messaging
from tests.watchdog import worker

WATCHDOG_TIMEOUT = 10


_name = f"watchdog-{os.getpid()}"
_stream = None
_process = None
_worker_log_filename = None


def start():
    global _stream, _process, _worker_log_filename
    if _stream is not None:
        return
    if sys.version_info >= (3, 13):
        return

    args = [sys.executable, worker.__file__, str(os.getpid())]
    log.info(
        "Spawning {0} for tests-{1}:\n\n{2}",
        _name,
        os.getpid(),
        "\n".join(repr(s) for s in args),
    )

    _process = psutil.Popen(
        args, bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE
    )

    _stream = messaging.JsonIOStream(_process.stdout, _process.stdin, _name)

    event, _worker_log_filename = _stream.read_json()
    assert event == "watchdog"

    atexit.register(stop)


def _dump_worker_log(command, problem, exc_info=None):
    reason = f"{_name}.{command}() {problem}"
    if _worker_log_filename is None:
        reason += ", but there is no log."
    else:
        try:
            with open(_worker_log_filename) as f:
                worker_log = f.read()
        except Exception:
            reason += f", but log {_worker_log_filename} could not be retrieved."
        else:
            reason += f"; watchdog worker process log:\n\n{worker_log}"

    if exc_info is None:
        log.error("{0}", reason)
    else:
        log.swallow_exception("{0}", reason, exc_info=exc_info)
    return reason


def _invoke(command, *args):
    def timeout():
        time.sleep(WATCHDOG_TIMEOUT)
        if timeout.occurred is None:
            reason = _dump_worker_log(command, "timed out")
            timeout.occurred = reason

    timeout.occurred = None
    timeout_thread = threading.Thread(target=timeout)
    timeout_thread.daemon = True
    timeout_thread.start()
    try:
        try:
            _stream.write_json([command] + list(args))
            response = _stream.read_json()
            assert response == ["ok"], f"{_name} {response!r}"
        finally:
            timeout.occurred = False
    except Exception:
        _dump_worker_log(command, "failed", sys.exc_info())
        raise
    else:
        assert not timeout.occurred, str(timeout.occurred)


def stop():
    if _stream is None:
        return

    try:
        _invoke("stop")
        _stream.close()
    except Exception:
        log.swallow_exception()


def register_spawn(pid, name):
    if sys.version_info >= (3, 13):
        return
    if _stream is None:
        start()
    _invoke("register_spawn", pid, name)


def unregister_spawn(pid, name):
    if sys.version_info >= (3, 13):
        return
    assert _stream is not None
    _invoke("unregister_spawn", pid, name)