1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
from __future__ import annotations
import logging
import os
import shlex
import subprocess
_log = logging.getLogger("util.run")
def run(
*args,
cwd=None,
input=None,
capture_stdout=False,
capture_stderr=False,
shell=False,
env=None,
check=True,
quiet=False,
):
"""Runs a subprocess.
Args:
*args: The subprocess arguments.
cwd: The working directory. If None, specifies the current directory.
input: The optional input byte sequence.
capture_stdout: Whether to capture stdout.
capture_stderr: Whether to capture stderr.
shell: Whether to run using the shell.
env: The environment variables as a dict. If None, inherits the current
environment.
check: Whether to raise an error if the return code is not zero.
quiet: If true, do not print output from the subprocess.
Returns:
A subprocess.CompletedProcess instance.
"""
cmd = [*args]
_log.info(
"Running subprocess in '{}'\n {}".format(cwd or os.getcwd(), " ".join([shlex.quote(arg) for arg in cmd]))
)
def output(is_stream_captured):
return subprocess.PIPE if is_stream_captured else (subprocess.DEVNULL if quiet else None)
completed_process = subprocess.run(
cmd,
cwd=cwd,
check=check,
input=input,
stdout=output(capture_stdout),
stderr=output(capture_stderr),
env=env,
shell=shell,
)
_log.debug(f"Subprocess completed. Return code: {completed_process.returncode}")
return completed_process
|