File: util.py

package info (click to toggle)
cpptrace 1.0.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,996 kB
  • sloc: cpp: 15,646; python: 962; ansic: 155; sh: 103; makefile: 86
file content (159 lines) | stat: -rw-r--r-- 6,295 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import subprocess
import sys
import itertools
from typing import List, Dict
from colorama import Fore, Back, Style
import re
import time

# https://stackoverflow.com/a/14693789/15675011
ansi_escape = re.compile(r'''
    \x1B  # ESC
    (?:   # 7-bit C1 Fe (except CSI)
        [@-Z\\-_]
    |     # or [ for CSI, followed by a control sequence
        \[
        [0-?]*  # Parameter bytes
        [ -/]*  # Intermediate bytes
        [@-~]   # Final byte
    )
''', re.VERBOSE)

class MatrixRunner:
    def __init__(self, matrix, exclude):
        self.matrix = matrix
        self.exclude = exclude
        self.include = self.parse_includes()
        self.keys = [*matrix.keys()]
        self.values = [*matrix.values()]
        self.results = {} # insertion-ordered
        self.failed = False
        self.work = self.get_work()

        self.last_matrix_config = None
        self.current_matrix_config = None

    def parse_includes(self) -> Dict[str, List[str]]:
        includes: Dict[str, List[str]] = dict()
        for arg in sys.argv:
            if arg.startswith("--slice="):
                rest = arg[len("--slice="):]
                key, value = rest.split(":")
                if key not in includes:
                    includes[key] = []
                includes[key].append(value)
        return includes

    def run_command(self, *args: List[str], always_output=False, output_matcher=None) -> bool:
        self.log(f"{Fore.CYAN}{Style.BRIGHT}Running Command \"{' '.join(args)}\"{Style.RESET_ALL}")
        start_time = time.time()
        p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        stdout, stderr = p.communicate()
        runtime = time.time() - start_time
        self.log(Style.RESET_ALL, end="") # makefile in parallel sometimes messes up colors
        if p.returncode != 0:
            self.log(f"{Fore.RED}{Style.BRIGHT}Command failed{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
            self.log("stdout:")
            self.log(stdout.decode("utf-8"), end="")
            self.log("stderr:")
            self.log(stderr.decode("utf-8"), end="")
            self.failed = True
            return False
        else:
            self.log(f"{Fore.GREEN}{Style.BRIGHT}Command succeeded{Style.RESET_ALL} {Fore.MAGENTA}(time: {runtime:.2f}s){Style.RESET_ALL}")
            if always_output:
                self.log("stdout:")
                self.log(stdout.decode("utf-8"), end="")
                self.log("stderr:")
                self.log(stderr.decode("utf-8"), end="")
            elif len(stderr) != 0:
                self.log("stderr:")
                self.log(stderr.decode("utf-8"), end="")
            if output_matcher is not None:
                if not output_matcher(stdout.decode("utf-8")):
                    self.failed = True
                    return False
            return True

    def set_fail(self):
        self.failed = True

    def current_config(self):
        return self.current_matrix_config

    def last_config(self):
        return self.last_matrix_config

    def log(self, *args, **kwargs):
        print(*args, **kwargs, flush=True)

    def do_exclude(self, matrix_config, exclude):
        return all(map(lambda k: matrix_config[k] == exclude[k], exclude.keys()))

    def do_include(self, matrix_config, include):
        if len(include) == 0:
            return True
        return all(map(lambda k: matrix_config[k] in include[k], include.keys()))

    def assignment_to_matrix_config(self, assignment):
        matrix_config = {}
        for k, v in zip(self.matrix.keys(), assignment):
            matrix_config[k] = v
        return matrix_config

    def get_work(self):
        work = []
        for assignment in itertools.product(*self.matrix.values()):
            config = self.assignment_to_matrix_config(assignment)
            if any(map(lambda ex: self.do_exclude(config, ex), self.exclude)):
                continue
            if not self.do_include(config, self.include):
                continue
            work.append(assignment)
        return work

    def run(self, fn):
        for i, assignment in enumerate(self.work):
            matrix_config = self.assignment_to_matrix_config(assignment)
            config_tuple = tuple(self.values[i].index(p) for i, p in enumerate(assignment))
            config_str = ', '.join(map(lambda v: str(v), matrix_config.values()))
            if config_str == "":
                self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with blank config {'=' * 10}{Style.RESET_ALL}")
            else:
                self.log(f"{Fore.BLUE}{Style.BRIGHT}{'=' * 10} [{i + 1}/{len(self.work)}] Running with config {config_str} {'=' * 10}{Style.RESET_ALL}")
            self.last_matrix_config = self.current_matrix_config
            self.current_matrix_config = matrix_config
            self.results[config_tuple] = fn(self)
        self.print_results()
        if self.failed:
            self.log("🔴 Some checks failed")
            sys.exit(1)
        else:
            self.log("🟢 All checks passed")

    def adj_width(self, text):
        return len(text) - len(ansi_escape.sub("", text))

    def print_table(self, table):
        columns = len(table[0])
        column_widths = [1 for _ in range(columns)]
        for row in table:
            for i, cell in enumerate(row):
                column_widths[i] = max(column_widths[i], len(ansi_escape.sub("", cell)))
        for j, cell in enumerate(table[0]):
            self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
        self.log("|")
        for i, row in enumerate(table[1:]):
            for j, cell in enumerate(row):
                self.log("| {cell:{width}} ".format(cell=cell, width=column_widths[j] + self.adj_width(cell)), end="")
            self.log("|")

    def print_results(self):
        self.log("Results:")
        table = [self.keys]
        for result in self.results:
            table.append([
                f"{Fore.GREEN if self.results[result] else Fore.RED}{Style.BRIGHT}{self.values[i][v]}{Style.RESET_ALL}"
                    for i, v in enumerate(result)
            ])
        self.print_table(table)