File: output.py

package info (click to toggle)
debugpy 1.8.12%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,424 kB
  • sloc: python: 14,451; sh: 184; makefile: 33
file content (110 lines) | stat: -rw-r--r-- 3,543 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root
# for license information.

import os
import re
import threading

from debugpy.common import log


class CapturedOutput(object):
    """Captures stdout and stderr of the debugged process."""

    def __init__(self, session, **fds):
        self.session = session
        self._lock = threading.Lock()
        self._chunks = {}
        self._worker_threads = []

        for stream_name, fd in fds.items():
            log.info("Capturing {0} {1}", session.debuggee_id, stream_name)
            self._capture(fd, stream_name)

    def __str__(self):
        return f"CapturedOutput[{self.session.id}]"

    def _worker(self, fd, name):
        chunks = self._chunks[name]
        try:
            while True:
                try:
                    chunk = os.read(fd, 0x1000)
                except Exception:
                    break
                if not len(chunk):
                    break

                lines = "\n".join(
                    repr(line) for line, _ in re.findall(b"(.+?(\n|$))", chunk)
                )
                log.info("{0} {1}:\n{2}", self.session.debuggee_id, name, lines)

                with self._lock:
                    chunks.append(chunk)
        finally:
            os.close(fd)

    def _capture(self, fd, name):
        assert name not in self._chunks
        self._chunks[name] = []

        thread = threading.Thread(
            target=lambda: self._worker(fd, name), name=f"{self} {name}"
        )
        thread.daemon = True
        thread.start()
        self._worker_threads.append(thread)

    def wait(self, timeout=None):
        """Wait for all remaining output to be captured."""
        if not self._worker_threads:
            return
        log.debug("Waiting for remaining {0} output...", self.session.debuggee_id)
        for t in self._worker_threads:
            t.join(timeout)
        self._worker_threads[:] = []

    def _output(self, which, encoding, lines):
        try:
            result = self._chunks[which]
        except KeyError:
            raise AssertionError(
                f"{which} was not captured for {self.session.debuggee_id}"
            )

        with self._lock:
            result = b"".join(result)
        if encoding is not None:
            result = result.decode(encoding)

        return result.splitlines() if lines else result

    def stdout(self, encoding=None):
        """Returns stdout captured from the debugged process, as a single string.

        If encoding is None, returns bytes. Otherwise, returns str.
        """
        return self._output("stdout", encoding, lines=False)

    def stderr(self, encoding=None):
        """Returns stderr captured from the debugged process, as a single string.

        If encoding is None, returns bytes. Otherwise, returns str.
        """
        return self._output("stderr", encoding, lines=False)

    def stdout_lines(self, encoding=None):
        """Returns stdout captured from the debugged process, as a list of lines.

        If encoding is None, each line is bytes. Otherwise, each line is str.
        """
        return self._output("stdout", encoding, lines=True)

    def stderr_lines(self, encoding=None):
        """Returns stderr captured from the debugged process, as a list of lines.

        If encoding is None, each line is bytes. Otherwise, each line is str.
        """
        return self._output("stderr", encoding, lines=True)