1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
|
############################################################################
# Copyright (C) Intel Corporation
#
# SPDX-License-Identifier: MIT
############################################################################
"""commandline like tools"""
import shutil
import os
import sys
from contextlib import contextmanager
import time
import subprocess # nosec
VERBOSE = False
if 'VERBOSE' in os.environ:
if os.environ['VERBOSE'] not in ['']:
VERBOSE = True
def _resolve_path(path):
"""resolve a path name from either a string, or a list of sub-paths"""
if isinstance(path, list):
return os.path.join(*path)
return path
def _escape_cmd_arg(arg):
"""quote/escape and argument for a command line call so that it can
be safely used even if it has special charaters"""
if ' ' in arg or '"' in arg:
return '"' + arg.replace('"', '""') + '"'
return arg
def log(message):
"""Log activity"""
if VERBOSE:
print(f">> {message}", file=sys.stderr)
sys.stderr.flush()
@contextmanager
def pushd(dst):
"""change working directory"""
cur_dir = os.getcwd()
dest = os.path.join(cur_dir, _resolve_path(dst))
os.chdir(dest)
log(f'pushd {dest}')
try:
yield
finally:
log(f'popd -> {cur_dir}')
os.chdir(cur_dir)
#pylint: disable=invalid-name
def rm(target):
"""delete a file or folder"""
target = _resolve_path(target)
if os.path.exists(target):
# Delete sometimes fails if done immediately, timeout
# is not great, but allows filesystem settings to stabilize.
timeout = time.time() + 10
while time.time() < timeout:
try:
if os.path.isfile(target):
log(f'rm {target}')
os.remove(target)
break
if os.path.isdir(target):
log(f'rm -rf {target}')
shutil.rmtree(target)
break
except PermissionError:
time.sleep(1)
#pylint: disable=invalid-name
def md(target):
"""make a folder"""
target = _resolve_path(target)
if target and not os.path.exists(target):
log(f'mkdir -p {target}')
os.makedirs(target)
#pylint: disable=invalid-name
def cp(src, dest):
"""copy a file or folder"""
src = _resolve_path(src)
dest = _resolve_path(dest)
if os.path.exists(src):
rm(dest)
md(os.path.dirname(src))
if os.path.isfile(src):
log(f'cp {src} {dest}')
shutil.copyfile(src, dest)
elif os.path.isdir(src):
log(f'cp {src} {dest}')
shutil.copytree(src, dest)
else:
raise RuntimeError("Cannot copy unknown file type")
def join_command(command):
"""Join a series or parameters into a command, escaping if needed"""
return ' '.join([_escape_cmd_arg(argument) for argument in command])
def run_command(*args, no_throw=False, env=None):
"""Run a command"""
if len(args) == 1:
cmd = args[0]
else:
cmd = join_command(args)
log(f'{cmd}')
if os.name != 'nt':
cmd = "exec bash -c '" + cmd + "'"
with subprocess.Popen(cmd, shell=True, env=env) as proc: # nosec
proc.communicate()
if not no_throw and proc.returncode != 0:
raise RuntimeError("Error running command: " + cmd)
return proc.returncode
def run_commands(*args, no_throw=False, env=None):
"""Run several commands"""
commands = []
for arg in args:
if isinstance(arg, (str)):
commands.append(arg)
else:
commands.append(join_command(arg))
if os.name == 'nt':
script_file = "temp.bat"
else:
script_file = "temp.sh"
with open(script_file, "w", encoding="utf-8") as script:
log('echo "')
for cmd in commands:
log(f'{cmd}')
script.write(cmd + "\n")
log(f'" > {script_file}')
log('{script_file}')
if os.name == 'nt':
cmd = script_file
else:
cmd = "exec bash -c 'source " + script_file + "'"
with subprocess.Popen(cmd, shell=True, env=env) as proc: # nosec
proc.communicate()
rm(script_file)
if not no_throw and proc.returncode != 0:
raise RuntimeError("Error running: \n" + "\n".join(commands))
return proc.returncode
def capture_command(*args, env=None):
"""Run a command and capture the output"""
if len(args) == 1:
cmd = args[0]
else:
cmd = join_command(args)
log(f'{cmd}')
if os.name != 'nt':
cmd = "exec bash -c '" + cmd + "'"
with subprocess.Popen( # nosec
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
shell=True,
env=env) as proc:
result = proc.communicate()
return (result[0], result[1], proc.returncode)
def capture_commands(*args, env=None):
"""Run several commands and capture the output"""
commands = []
for arg in args:
if not arg:
continue
if isinstance(arg, (str)):
commands.append(arg)
else:
commands.append(join_command(arg))
script_file = None
if os.name == 'nt':
script_file = "temp.bat"
else:
script_file = "temp.sh"
with open(script_file, "w", encoding="utf-8") as script:
log('echo "')
for cmd in commands:
log(f'{cmd}')
script.write(cmd + "\n")
log(f'" > {script_file}')
log('{script_file}')
if os.name == 'nt':
cmd = script_file
else:
cmd = "exec bash -c 'source " + script_file + "'"
with subprocess.Popen( # nosec
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
shell=True,
env=env) as proc:
result = proc.communicate()
rm(script_file)
return (result[0], result[1], proc.returncode)
|