File: worker.py

package info (click to toggle)
debugpy 1.8.12%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 1,424 kB
  • sloc: python: 14,451; sh: 184; makefile: 33
file content (158 lines) | stat: -rw-r--r-- 4,844 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.

"""The main script for the watchdog worker process.
"""

# This code runs in a separate process, and should not import pytest or tests!
# Do not import debugpy on top level, either - sys.path needs to be fixed first -
# this is done in main().

import collections
import psutil
import sys
import time


ProcessInfo = collections.namedtuple("ProcessInfo", ["process", "name"])


def main(tests_pid):
    from debugpy.common import log, messaging

    # log.stderr_levels |= {"info"}
    log.timestamp_format = "06.3f"
    log_file = log.to_file(prefix="tests.watchdog")

    stream = messaging.JsonIOStream.from_stdio(f"tests-{tests_pid}")
    log.info("Spawned WatchDog-{0} for tests-{0}", tests_pid)
    tests_process = psutil.Process(tests_pid)
    stream.write_json(["watchdog", log_file.filename])

    spawned_processes = {}  # pid -> ProcessInfo
    try:
        stop = False
        while not stop:
            try:
                message = stream.read_json()
            except Exception:
                break

            command = message[0]
            args = message[1:]

            if command == "stop":
                assert not args
                stop = True

            elif command == "register_spawn":
                pid, name = args
                pid = int(pid)

                log.info(
                    "WatchDog-{0} registering spawned process {1} (pid={2})",
                    tests_pid,
                    name,
                    pid,
                )
                try:
                    _, old_name = spawned_processes[pid]
                except KeyError:
                    pass
                else:
                    log.warning(
                        "WatchDog-{0} already tracks a process with pid={1}: {2}",
                        tests_pid,
                        pid,
                        old_name,
                    )
                spawned_processes[pid] = ProcessInfo(psutil.Process(pid), name)

            elif command == "unregister_spawn":
                pid, name = args
                pid = int(pid)

                log.info(
                    "WatchDog-{0} unregistering spawned process {1} (pid={2})",
                    tests_pid,
                    name,
                    pid,
                )
                spawned_processes.pop(pid, None)

            else:
                raise AssertionError(f"Unknown watchdog command: {command!r}")

            stream.write_json(["ok"])

    except Exception as exc:
        stream.write_json(["error", str(exc)])
        log.reraise_exception()

    finally:
        try:
            stream.close()
        except Exception:
            log.swallow_exception()

        # If the test runner becomes a zombie process, it is still considered alive,
        # and wait() will block indefinitely. Poll status instead.
        while True:
            try:
                status = tests_process.status()
            except Exception:
                # If we can't even get its status, assume that it's dead.
                break

            # If it's dead or a zombie, time to clean it up.
            if status in (psutil.STATUS_DEAD, psutil.STATUS_ZOMBIE):
                break

            # Otherwise, let's wait a bit to see if anything changes.
            try:
                tests_process.wait(0.1)
            except Exception:
                pass

        leftover_processes = {proc for proc, _ in spawned_processes.values()}
        for proc, _ in spawned_processes.values():
            try:
                leftover_processes |= proc.children(recursive=True)
            except Exception:
                pass

        leftover_processes = {proc for proc in leftover_processes if proc.is_running()}
        if not leftover_processes:
            return

        # Wait a bit to allow the terminal to catch up on the test runner output.
        time.sleep(0.3)

        log.newline(level="warning")
        log.warning(
            "tests-{0} process terminated unexpectedly, and left some orphan child "
            "processes behind: {1!r}",
            tests_pid,
            sorted({proc.pid for proc in leftover_processes}),
        )

        for proc in leftover_processes:
            log.warning(
                "WatchDog-{0} killing orphaned test child process (pid={1})",
                tests_pid,
                proc.pid,
            )

            try:
                proc.kill()
            except psutil.NoSuchProcess:
                pass
            except Exception:
                log.swallow_exception()

        log.info("WatchDog-{0} exiting", tests_pid)


if __name__ == "__main__":
    main(int(sys.argv[1]))