1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
#!/usr/bin/env python3
# The following script requires Python 3.9 or higher
import argparse
import json
import subprocess
from typing import Callable, TypedDict, Literal, Union
class SpreadLogDetail(TypedDict):
lines: list[str]
class SpreadLog_TypePhase(TypedDict):
type: Literal["phase"]
task: str
verb: str # log_helper.ExecutionPhase
detail: SpreadLogDetail
class SpreadLog_TypeResult(TypedDict):
type: Literal["result"]
result_type: str # log_helper.Result
level: str # log_helper.ExecutionLevel
stage: str
detail: SpreadLogDetail
SpreadLog = Union[SpreadLog_TypePhase, SpreadLog_TypeResult]
def filter_with_spread(exec_param: list[str]) -> list[str]:
cmd = ["spread", "-list"]
cmd.extend(exec_param)
return subprocess.check_output(cmd, text=True).splitlines()
def list_executed_tasks(
filtered_exec_param: set[str], spread_logs: list[SpreadLog]
) -> set[str]:
executed = [
log["task"]
for log in spread_logs
if log["type"] == "phase" and log["verb"] == "Executing"
]
return filtered_exec_param.intersection(executed)
def _get_detail_lines(
spread_logs: list[SpreadLog], log_condition_func: Callable[[SpreadLog], bool]
) -> list[str]:
result = [log["detail"]["lines"]
for log in spread_logs if log_condition_func(log)]
# Each entry in ['detail']['lines'] is a spread task prefaced with a '-' and
# surrounding by whitespace
return [log.strip().removeprefix('-').strip() for sublist in result for log in sublist]
def list_failed_tasks(
filtered_exec_param: set[str], spread_logs: list[SpreadLog]
) -> set[str]:
def log_condition(log: SpreadLog) -> bool:
return (
log["type"] == "result"
and log["result_type"] == "Failed"
and log["level"] == "tasks"
)
failed = _get_detail_lines(spread_logs, log_condition)
return filtered_exec_param.intersection(failed)
def _list_failed(spread_logs: list[SpreadLog], level: str, stage: str) -> list[str]:
def log_condition(log: SpreadLog) -> bool:
return (
log["type"] == "result"
and log["result_type"] == "Failed"
and log["level"] == level
and log["stage"] == stage
)
return _get_detail_lines(spread_logs, log_condition)
def list_executed_and_failed(
filtered_exec_param: set[str], spread_logs: list[SpreadLog]
) -> set[str]:
failed = list_failed_tasks(filtered_exec_param, spread_logs)
failed_prepare = _list_failed(spread_logs, "task", "prepare")
failed_restore = _list_failed(spread_logs, "task", "restore")
union = failed.union(failed_restore)
return union.difference(failed_prepare)
def list_aborted_tasks(
filtered_exec_param: set[str], spread_logs: list[SpreadLog]
) -> set[str]:
executed_tasks = list_executed_tasks(filtered_exec_param, spread_logs)
if len(executed_tasks) == 0:
exec_and_failed = list_executed_and_failed(
filtered_exec_param, spread_logs)
return filtered_exec_param.difference(exec_and_failed)
return filtered_exec_param.difference(executed_tasks)
def list_successful_tasks(
filtered_exec_param: set[str], spread_logs: list[SpreadLog]
) -> set[str]:
executed_tasks = list_executed_tasks(filtered_exec_param, spread_logs)
failed = list_failed_tasks(filtered_exec_param, spread_logs)
failed_restore = _list_failed(spread_logs, "task", "restore")
failed = failed.union(failed_restore)
if len(failed) > 0:
executed_tasks = executed_tasks.difference(failed)
return executed_tasks
def list_rexecute_tasks(
exec_param: str, filtered_exec_param: set[str], spread_logs: list[SpreadLog]
) -> set[str]:
aborted_tasks = list_aborted_tasks(filtered_exec_param, spread_logs)
exec_and_failed = list_executed_and_failed(
filtered_exec_param, spread_logs)
exec_and_failed = exec_and_failed.intersection(filtered_exec_param)
union = aborted_tasks.union(exec_and_failed)
if len(filtered_exec_param.difference(union)) == 0:
return set(exec_param)
return union
def add_arguments(parser: argparse.ArgumentParser) -> None:
parser.add_argument(
"exec_params",
help="This is the parameter used to run spread (something like this BACKEND:SYSTEM:SUITE)",
)
parser.add_argument(
"parsed_log",
type=argparse.FileType("r", encoding="utf-8"),
help="This is the output generated by the log-parser tool",
)
def main() -> None:
parser = argparse.ArgumentParser(
description="""
Usage: log-analyzer list-failed-tasks <EXEC-PARAM> <PARSED-LOG>
log-analyzer list-executed-tasks <EXEC-PARAM> <PARSED-LOG>
log-analyzer list-successful-tasks <EXEC-PARAM> <PARSED-LOG>
log-analyzer list-aborted-tasks <EXEC-PARAM> <PARSED-LOG>
log-analyzer list-all-tasks <EXEC-PARAM>
log-analyzer list-reexecute-tasks <EXEC-PARAM> <PARSED-LOG>
The log analyzer is an utility that provides useful information about a spread
execution. The main functionality of the analyzer utility is to determine which tests
have to be re-executed, including aborted tests that are not included in the test results.
The log analyzer uses as input the spread expression that was used to run the tests.
This expression determines which tests to considered. The second input is the output of
the log-parser utility, which generates a json file including all the information
extracted from the raw spread log.
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
subparsers = parser.add_subparsers(dest="command")
subparsers.required = True
failed = subparsers.add_parser(
"list-failed-tasks", help="list the tasks that failed during execute"
)
executed = subparsers.add_parser(
"list-executed-tasks", help="list the tasks that were executed"
)
successful = subparsers.add_parser(
"list-successful-tasks", help="list the successful tasks"
)
aborted = subparsers.add_parser(
"list-aborted-tasks",
help="list the aborted tasks (needs spread to be installed)",
)
reexecute = subparsers.add_parser(
"list-reexecute-tasks",
help="list the tasks to re-execute to complete (includes aborted and failed tasks)",
)
list_all = subparsers.add_parser(
"list-all-tasks", help="list all the tasks")
add_arguments(failed)
add_arguments(executed)
add_arguments(successful)
add_arguments(aborted)
add_arguments(reexecute)
list_all.add_argument(
"exec_params",
help="This is the parameter used to run spread (something like this BACKEND:SYSTEM:SUITE)",
)
args = parser.parse_args()
exec_params = args.exec_params.replace(",", " ").split()
filtered_exec_param = set(filter_with_spread(exec_params))
if hasattr(args, "parsed_log"):
log = json.load(args.parsed_log)
if not log:
raise RuntimeError("log.analyzer: the log file cannot be empty")
if args.command == "list-failed-tasks":
print(" ".join(list_failed_tasks(filtered_exec_param, log)))
elif args.command == "list-executed-tasks":
print(" ".join(list_executed_tasks(filtered_exec_param, log)))
elif args.command == "list-successful-tasks":
print(" ".join(list_successful_tasks(filtered_exec_param, log)))
elif args.command == "list-aborted-tasks":
print(" ".join(list_aborted_tasks(filtered_exec_param, log)))
elif args.command == "list-all-tasks":
print(" ".join(filtered_exec_param))
elif args.command == "list-reexecute-tasks":
print(" ".join(list_rexecute_tasks(
exec_params, filtered_exec_param, log)))
else:
raise RuntimeError("log.analyzer: no such command: %s" % args.command)
if __name__ == "__main__":
main()
|