1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
import os
import sys
import time
from subprocess import CalledProcessError, check_call
from typing import List, IO, Optional, Tuple
def which(command: str, paths: Optional[str] = None) -> Optional[str]:
"""which(command, [paths]) - Look up the given command in the paths string
(or the PATH environment variable, if unspecified)."""
if paths is None:
paths = os.environ.get('PATH', '')
# Check for absolute match first.
if os.path.exists(command):
return command
# Would be nice if Python had a lib function for this.
if not paths:
paths = os.defpath
# Get suffixes to search.
# On Cygwin, 'PATHEXT' may exist but it should not be used.
if os.pathsep == ';':
pathext = os.environ.get('PATHEXT', '').split(';')
else:
pathext = ['']
# Search the paths...
for path in paths.split(os.pathsep):
for ext in pathext:
p = os.path.join(path, command + ext)
if os.path.exists(p):
return p
return None
def has_no_extension(file_name: str) -> bool:
root, ext = os.path.splitext(file_name)
return ext == ""
def is_valid_single_input_file(file_name: str) -> bool:
root, ext = os.path.splitext(file_name)
return ext in (".i", ".ii", ".c", ".cpp", ".m", "")
def time_to_str(time: float) -> str:
"""
Convert given time in seconds into a human-readable string.
"""
return f"{time:.2f}s"
def memory_to_str(memory: int) -> str:
"""
Convert given number of bytes into a human-readable string.
"""
if memory:
try:
import humanize
return humanize.naturalsize(memory, gnu=True)
except ImportError:
# no formatter installed, let's keep it in bytes
return f"{memory}B"
# If memory is 0, we didn't succeed measuring it.
return "N/A"
def check_and_measure_call(*popenargs, **kwargs) -> Tuple[float, int]:
"""
Run command with arguments. Wait for command to complete and measure
execution time and peak memory consumption.
If the exit code was zero then return, otherwise raise
CalledProcessError. The CalledProcessError object will have the
return code in the returncode attribute.
The arguments are the same as for the call and check_call functions.
Return a tuple of execution time and peak memory.
"""
peak_mem = 0
start_time = time.time()
try:
import psutil as ps
def get_memory(process: ps.Process) -> int:
mem = 0
# we want to gather memory usage from all of the child processes
descendants = list(process.children(recursive=True))
descendants.append(process)
for subprocess in descendants:
try:
mem += subprocess.memory_info().rss
except (ps.NoSuchProcess, ps.AccessDenied):
continue
return mem
with ps.Popen(*popenargs, **kwargs) as process:
# while the process is running calculate resource utilization.
while (process.is_running() and
process.status() != ps.STATUS_ZOMBIE):
# track the peak utilization of the process
peak_mem = max(peak_mem, get_memory(process))
time.sleep(.5)
if process.is_running():
process.kill()
if process.returncode != 0:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(process.returncode, cmd)
except ImportError:
# back off to subprocess if we don't have psutil installed
peak_mem = 0
check_call(*popenargs, **kwargs)
return time.time() - start_time, peak_mem
def run_script(script_path: str, build_log_file: IO, cwd: str,
out=sys.stdout, err=sys.stderr, verbose: int = 0):
"""
Run the provided script if it exists.
"""
if os.path.exists(script_path):
try:
if verbose == 1:
out.write(f" Executing: {script_path}\n")
check_call(f"chmod +x '{script_path}'", cwd=cwd,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
check_call(f"'{script_path}'", cwd=cwd,
stderr=build_log_file,
stdout=build_log_file,
shell=True)
except CalledProcessError:
err.write(f"Error: Running {script_path} failed. "
f"See {build_log_file.name} for details.\n")
sys.exit(-1)
def is_comment_csv_line(entries: List[str]) -> bool:
"""
Treat CSV lines starting with a '#' as a comment.
"""
return len(entries) > 0 and entries[0].startswith("#")
|