1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
import subprocess
import sys
import itertools
from typing import List, Dict
from colorama import Fore, Back, Style
import re
import time
# https://stackoverflow.com/a/14693789/15675011
ansi_escape = re.compile(r'''
\x1B # ESC
(?: # 7-bit C1 Fe (except CSI)
[@-Z\\-_]
| # or [ for CSI, followed by a control sequence
\[
[0-?]* # Parameter bytes
[ -/]* # Intermediate bytes
[@-~] # Final byte
)
''', re.VERBOSE)
class MatrixRunner:
def __init__(self, matrix, exclude):
self.matrix = matrix
self.exclude = exclude
self.include = self.parse_includes()
self.keys = [*matrix.keys()]
self.values = [*matrix.values()]
self.results = {} # insertion-ordered
self.failed = False
self.work = self.get_work()
self.last_matrix_config = None
self.current_matrix_config = None
def parse_includes(self) -> Dict[str, List[str]]:
includes: Dict[str, List[str]] = dict()
for arg in sys.argv:
if arg.startswith("--slice="):
rest = arg[len("--slice="):]
key, value = rest.split(":")
if key not in includes:
includes[key] = []
includes[key].append(value)
return includes
def run_command(self, *args: List[str], always_output=False, output_matcher=None) -> bool:
self.log(f"{Fore.CYAN}{Style.BRIGHT}Running Command \"{' '.join(args)}\"{Style.RESET_ALL}")
start_time = time.time()
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
runtime = time.time() - start_time
self.log(Style.RESET_ALL, end="") # makefile in parallel sometimes messes up colors
if p.returncode != 0:
self.log(f"{Fore.RED}{Style.BRIGHT}Command failed{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
self.log("stdout:")
self.log(stdout.decode("utf-8"), end="")
self.log("stderr:")
self.log(stderr.decode("utf-8"), end="")
self.failed = True
return False
else:
self.log(f"{Fore.GREEN}{Style.BRIGHT}Command succeeded{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
if always_output:
self.log("stdout:")
self.log(stdout.decode("utf-8"), end="")
self.log("stderr:")
self.log(stderr.decode("utf-8"), end="")
elif len(stderr) != 0:
self.log("stderr:")
self.log(stderr.decode("utf-8"), end="")
if output_matcher is not None:
if not output_matcher(stdout.decode("utf-8")):
self.failed = True
return False
return True
def set_fail(self):
self.failed = True
def current_config(self):
return self.current_matrix_config
def last_config(self):
return self.last_matrix_config
def log(self, *args, **kwargs):
print(*args, **kwargs, flush=True)
def do_exclude(self, matrix_config, exclude):
return all(map(lambda k: matrix_config[k] == exclude[k], exclude.keys()))
def do_include(self, matrix_config, include):
if len(include) == 0:
return True
return all(map(lambda k: matrix_config[k] in include[k], include.keys()))
def assignment_to_matrix_config(self, assignment):
matrix_config = {}
for k, v in zip(self.matrix.keys(), assignment):
matrix_config[k] = v
return matrix_config
def get_work(self):
work = []
for assignment in itertools.product(*self.matrix.values()):
config = self.assignment_to_matrix_config(assignment)
if any(map(lambda ex: self.do_exclude(config, ex), self.exclude)):
continue
if not self.do_include(config, self.include):
continue
work.append(assignment)
return work
def run(self, fn):
for i, assignment in enumerate(self.work):
matrix_config = self.assignment_to_matrix_config(assignment)
config_tuple = tuple(self.values[i].index(p) for i, p in enumerate(assignment))
config_str = ', '.join(map(lambda v: str(v), matrix_config.values()))
if config_str == "":
self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with blank config {'=' * 10}{Style.RESET_ALL}")
else:
self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with config {config_str} {'=' * 10}{Style.RESET_ALL}")
self.last_matrix_config = self.current_matrix_config
self.current_matrix_config = matrix_config
self.results[config_tuple] = fn(self)
self.print_results()
if self.failed:
self.log("🔴 Some checks failed")
sys.exit(1)
else:
self.log("🟢 All checks passed")
def adj_width(self, text):
return len(text) - len(ansi_escape.sub("", text))
def print_table(self, table):
columns = len(table[0])
column_widths = [1 for _ in range(columns)]
for row in table:
for i, cell in enumerate(row):
column_widths[i] = max(column_widths[i], len(ansi_escape.sub("", cell)))
for j, cell in enumerate(table[0]):
self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
self.log("|")
for i, row in enumerate(table[1:]):
for j, cell in enumerate(row):
self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
self.log("|")
def print_results(self):
self.log("Results:")
table = [self.keys]
for result in self.results:
table.append([
f"{Fore.GREEN if self.results[result] else Fore.RED}{Style.BRIGHT}{self.values[i][v]}{Style.RESET_ALL}"
for i, v in enumerate(result)
])
self.print_table(table)
|