1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
# -*- coding: utf-8 -*-
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
""" This module is a collection of methods commonly used in this project. """
import collections
import functools
import json
import logging
import os
import os.path
import re
import shlex
import subprocess
import sys
ENVIRONMENT_KEY = "INTERCEPT_BUILD"
Execution = collections.namedtuple("Execution", ["pid", "cwd", "cmd"])
CtuConfig = collections.namedtuple(
"CtuConfig", ["collect", "analyze", "dir", "extdef_map_cmd"]
)
def duplicate_check(method):
"""Predicate to detect duplicated entries.
Unique hash method can be use to detect duplicates. Entries are
represented as dictionaries, which has no default hash method.
This implementation uses a set datatype to store the unique hash values.
This method returns a method which can detect the duplicate values."""
def predicate(entry):
entry_hash = predicate.unique(entry)
if entry_hash not in predicate.state:
predicate.state.add(entry_hash)
return False
return True
predicate.unique = method
predicate.state = set()
return predicate
def run_build(command, *args, **kwargs):
"""Run and report build command execution
:param command: array of tokens
:return: exit code of the process
"""
environment = kwargs.get("env", os.environ)
logging.debug("run build %s, in environment: %s", command, environment)
exit_code = subprocess.call(command, *args, **kwargs)
logging.debug("build finished with exit code: %d", exit_code)
return exit_code
def run_command(command, cwd=None):
"""Run a given command and report the execution.
:param command: array of tokens
:param cwd: the working directory where the command will be executed
:return: output of the command
"""
def decode_when_needed(result):
"""check_output returns bytes or string depend on python version"""
return result.decode("utf-8") if isinstance(result, bytes) else result
try:
directory = os.path.abspath(cwd) if cwd else os.getcwd()
logging.debug("exec command %s in %s", command, directory)
output = subprocess.check_output(
command, cwd=directory, stderr=subprocess.STDOUT
)
return decode_when_needed(output).splitlines()
except subprocess.CalledProcessError as ex:
ex.output = decode_when_needed(ex.output).splitlines()
raise ex
def reconfigure_logging(verbose_level):
"""Reconfigure logging level and format based on the verbose flag.
:param verbose_level: number of `-v` flags received by the command
:return: no return value
"""
# Exit when nothing to do.
if verbose_level == 0:
return
root = logging.getLogger()
# Tune logging level.
level = logging.WARNING - min(logging.WARNING, (10 * verbose_level))
root.setLevel(level)
# Be verbose with messages.
if verbose_level <= 3:
fmt_string = "%(name)s: %(levelname)s: %(message)s"
else:
fmt_string = "%(name)s: %(levelname)s: %(funcName)s: %(message)s"
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(fmt=fmt_string))
root.handlers = [handler]
def command_entry_point(function):
"""Decorator for command entry methods.
The decorator initialize/shutdown logging and guard on programming
errors (catch exceptions).
The decorated method can have arbitrary parameters, the return value will
be the exit code of the process."""
@functools.wraps(function)
def wrapper(*args, **kwargs):
"""Do housekeeping tasks and execute the wrapped method."""
try:
logging.basicConfig(
format="%(name)s: %(message)s", level=logging.WARNING, stream=sys.stdout
)
# This hack to get the executable name as %(name).
logging.getLogger().name = os.path.basename(sys.argv[0])
return function(*args, **kwargs)
except KeyboardInterrupt:
logging.warning("Keyboard interrupt")
return 130 # Signal received exit code for bash.
except Exception:
logging.exception("Internal error.")
if logging.getLogger().isEnabledFor(logging.DEBUG):
logging.error(
"Please report this bug and attach the output " "to the bug report"
)
else:
logging.error(
"Please run this command again and turn on "
"verbose mode (add '-vvvv' as argument)."
)
return 64 # Some non used exit code for internal errors.
finally:
logging.shutdown()
return wrapper
def compiler_wrapper(function):
"""Implements compiler wrapper base functionality.
A compiler wrapper executes the real compiler, then implement some
functionality, then returns with the real compiler exit code.
:param function: the extra functionality what the wrapper want to
do on top of the compiler call. If it throws exception, it will be
caught and logged.
:return: the exit code of the real compiler.
The :param function: will receive the following arguments:
:param result: the exit code of the compilation.
:param execution: the command executed by the wrapper."""
def is_cxx_compiler():
"""Find out was it a C++ compiler call. Compiler wrapper names
contain the compiler type. C++ compiler wrappers ends with `c++`,
but might have `.exe` extension on windows."""
wrapper_command = os.path.basename(sys.argv[0])
return re.match(r"(.+)c\+\+(.*)", wrapper_command)
def run_compiler(executable):
"""Execute compilation with the real compiler."""
command = executable + sys.argv[1:]
logging.debug("compilation: %s", command)
result = subprocess.call(command)
logging.debug("compilation exit code: %d", result)
return result
# Get relevant parameters from environment.
parameters = json.loads(os.environ[ENVIRONMENT_KEY])
reconfigure_logging(parameters["verbose"])
# Execute the requested compilation. Do crash if anything goes wrong.
cxx = is_cxx_compiler()
compiler = parameters["cxx"] if cxx else parameters["cc"]
result = run_compiler(compiler)
# Call the wrapped method and ignore it's return value.
try:
call = Execution(
pid=os.getpid(),
cwd=os.getcwd(),
cmd=["c++" if cxx else "cc"] + sys.argv[1:],
)
function(result, call)
except:
logging.exception("Compiler wrapper failed complete.")
finally:
# Always return the real compiler exit code.
return result
def wrapper_environment(args):
"""Set up environment for interpose compiler wrapper."""
return {
ENVIRONMENT_KEY: json.dumps(
{
"verbose": args.verbose,
"cc": shlex.split(args.cc),
"cxx": shlex.split(args.cxx),
}
)
}
|