File: process_wrapper.py

package info (click to toggle)
qgis 3.40.14%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,185,400 kB
  • sloc: cpp: 1,616,418; python: 372,869; xml: 23,474; sh: 3,761; perl: 3,664; ansic: 2,829; sql: 2,137; yacc: 1,068; lex: 577; javascript: 540; lisp: 411; makefile: 155
file content (203 lines) | stat: -rw-r--r-- 7,385 bytes parent folder | download | duplicates (8)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""
***************************************************************************
    process_wrapper.py
    ---------------------
    Date                 : February 2023
    Copyright            : (C) 2023 by Yoann Quenach de Quivillic
    Email                : yoann dot quenach at gmail dot com
***************************************************************************
*                                                                         *
*   This program is free software; you can redistribute it and/or modify  *
*   it under the terms of the GNU General Public License as published by  *
*   the Free Software Foundation; either version 2 of the License, or     *
*   (at your option) any later version.                                   *
*                                                                         *
***************************************************************************
"""

import locale
import os
import subprocess
import signal
import sys
import time
from queue import Queue, Empty
from threading import Thread

from qgis.PyQt.QtCore import QObject, pyqtSignal


class ProcessWrapper(QObject):

    finished = pyqtSignal(int)

    def __init__(self, command, interactive=True, parent=None):
        super().__init__(parent)

        self.stdout = ""
        self.stderr = ""
        self.returncode = None

        options = {
            "stdout": subprocess.PIPE,
            "stdin": subprocess.PIPE,
            "stderr": subprocess.PIPE,
            "shell": True,
        }

        # On Unix, we can use os.setsid
        # This will allow killing the process and its children when pressing Ctrl+C if psutil is not available
        if hasattr(os, "setsid"):
            options["preexec_fn"] = os.setsid

        # Create and start subprocess
        self.p = subprocess.Popen(command, **options)

        # Start in non-interactive mode, wait for the process to finish
        if not interactive:
            out, err = self.p.communicate()
            self.stdout = self.decode(out)
            self.stderr = self.decode(err)
            self.returncode = self.p.returncode
            return

        # Read process stdout and push to out queue
        self.q_out = Queue()
        self.t_out = Thread(
            daemon=True, target=self.enqueue_output, args=[self.p.stdout, self.q_out]
        )
        self.t_out.start()

        # Read process stderr and push to err queue
        self.q_err = Queue()
        self.t_err = Thread(
            daemon=True, target=self.enqueue_output, args=[self.p.stderr, self.q_err]
        )
        self.t_err.start()

        # Polls process and output both queues content to sys.stdout and sys.stderr
        self.t_queue = Thread(daemon=True, target=self.dequeue_output)
        self.t_queue.start()

    def enqueue_output(self, stream, queue):
        while True:
            # We have to read the character one by one to ensure to
            # forward every available character to the queue
            # self.p.stdout.readline would block on a unfinished line
            char = stream.read(1)
            if not char:
                # Process terminated
                break
            queue.put(char)
        stream.close()

    def __repr__(self):
        """Helpful representation of the maanaged process"""
        status = (
            "Running" if self.returncode is None else f"Completed ({self.returncode})"
        )
        repr = f"ProcessWrapper object at {hex(id(self))}"
        repr += f"\n - Status: {status}"
        repr += f"\n - stdout: {self.stdout}"
        repr += f"\n - stderr: {self.stderr}"
        return repr

    def decode(self, bytes):
        try:
            # Try to decode the content as utf-8 first
            text = bytes.decode("utf8")
        except UnicodeDecodeError:
            try:
                # If it fails, fallback to the default locale encoding
                text = bytes.decode(locale.getdefaultlocale()[1])
            except UnicodeDecodeError:
                # If everything fails, use representation
                text = str(bytes)[2:-1]
        return text

    def read_content(self, queue, stream, is_stderr):
        """Write queue content to the standard stream and append it to the internal buffer"""
        content = b""
        while True:
            try:
                # While queue contains data, append it to content
                content += queue.get_nowait()
            except Empty:
                text = self.decode(content)
                if text:
                    # Append to the internal buffer
                    if is_stderr:
                        self.stderr += text
                    else:
                        self.stdout += text

                    stream.write(text)
                return

    def dequeue_output(self):
        """Check process every 0.1s and forward its outputs to stdout and stderr"""

        # Poll process and forward its outputs to stdout and stderr
        while self.p.poll() is None:
            time.sleep(0.1)
            self.read_content(self.q_out, sys.stdout, is_stderr=False)
            self.read_content(self.q_err, sys.stderr, is_stderr=True)

        # At this point, the process has terminated, so we wait for the threads to finish
        self.t_out.join()
        self.t_err.join()

        # Reaf the remaining content of the queues
        self.read_content(self.q_out, sys.stdout, is_stderr=False)
        self.read_content(self.q_err, sys.stderr, is_stderr=True)

        # Set returncode and emit finished signal
        self.returncode = self.p.returncode
        self.finished.emit(self.returncode)

    def wait(self, timeout=1):
        """Wait for the managed process to finish. If timeout=-1, waits indefinitely (and freeze the GUI)"""
        self.p.wait(timeout)

    def write(self, data):
        """Send data to the managed process"""
        try:
            self.p.stdin.write((data + "\n").encode("utf8"))
            self.p.stdin.flush()
        except BrokenPipeError as exc:
            self.p.stdout.close()
            self.p.stderr.close()
            self.finished.emit(self.p.poll())

    def kill(self):
        """Kill the managed process"""

        # Process in run with shell=True, so calling self.p.kill() would only kill the shell
        # (i.e a text editor launched with !gedit would not close) so we need to iterate
        # over the child processes to kill them all

        try:
            import psutil

            if self.p.returncode is None:
                process = psutil.Process(self.p.pid)
                for child_process in process.children(recursive=True):
                    child_process.kill()
                process.kill()
        except ImportError:
            # If psutil is not available, we try to use os.killpg to kill the process group (Unix only)
            try:
                os.killpg(os.getpgid(self.p.pid), signal.SIGTERM)
            except AttributeError:
                # If everything fails, simply kill the process. Children will not be killed
                self.p.kill()

    def __del__(self):
        """Ensure streams are closed when the process is destroyed"""
        self.p.stdout.close()
        self.p.stderr.close()
        self.p.stdin.close()
        try:
            self.kill()
        except ProcessLookupError:
            pass