1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
|
# Copyright 2017 Lars Wirzenius
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# =*= License: GPL-3+ =*=
import io
import logging
import os
import selectors
import subprocess
import sys
_verbose = False
def set_verbose_progress(verbose):
global _verbose
_verbose = verbose
def error(msg):
logging.error(msg, exc_info=True)
sys.stderr.write("ERROR: {}\n".format(msg))
sys.stderr.flush()
def progress(msg):
logging.info(msg)
if _verbose:
sys.stdout.write("{}\n".format(msg))
sys.stdout.flush()
def _log_line(line, stream_out, stream_label):
if line:
line = line.decode("UTF8")
if _verbose:
stream_out.write(line)
stream_out.flush()
logging.debug("%s: %s", stream_label, line)
return bool(line)
def runcmd(argv, **kwargs):
progress("Exec: %r" % (argv, ))
env = kwargs.get("env", os.environ.copy())
env["LC_ALL"] = "C.UTF8"
kwargs["env"] = env
kwargs["stdout"] = kwargs.get("stdout", subprocess.PIPE)
kwargs["stderr"] = kwargs.get("stderr", subprocess.PIPE)
for name in env:
logging.debug(f"ENV: {name}={env[name]}")
stdout_buffer = io.BytesIO()
def _log_stdout(stream_in):
line = stream_in.readline()
stdout_buffer.write(line)
return _log_line(line, sys.stdout, "STDOUT")
def _log_stderr(stream_in):
line = stream_in.readline()
return _log_line(line, sys.stderr, "STDERR")
selector = selectors.DefaultSelector()
p = subprocess.Popen(argv, **kwargs)
selector.register(p.stdout, selectors.EVENT_READ, _log_stdout)
selector.register(p.stderr, selectors.EVENT_READ, _log_stderr)
while p.poll() is None:
events = selector.select()
for key, _ in events:
callback = key.data
callback(key.fileobj)
p.wait()
selector.close()
# drain stdout and stderr
while _log_stdout(p.stdout) or _log_stderr(p.stderr):
pass
if p.returncode != 0:
raise RuncmdError("Program failed: {}".format(p.returncode))
return stdout_buffer.getvalue()
def runcmd_chroot(chroot, argv, *argvs, **kwargs):
full_argv = ["chroot", chroot] + argv
return runcmd(full_argv, *argvs, **kwargs)
def _procdir(chroot):
proc = os.path.join(chroot, "proc")
if not os.path.exists(proc):
os.mkdir(proc, mode=0o755)
return proc
def _log_stdout(data):
logging.debug("STDOUT: %r", data)
return data
def _log_stderr(data):
logging.debug("STDERR: %r", data)
return data
class RuncmdError(Exception):
pass
|