1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
#!/usr/bin/env python3
# ===--- perf_test_driver.py ---------------------------------------------===//
#
# This source file is part of the Swift.org open source project
#
# Copyright (c) 2014 - 2017 Apple Inc. and the Swift project authors
# Licensed under Apache License v2.0 with Runtime Library Exception
#
# See https://swift.org/LICENSE.txt for license information
# See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
#
# ===---------------------------------------------------------------------===//
import functools
import glob
import multiprocessing
import os
import platform
import re
import subprocess
BENCHMARK_OUTPUT_RE = re.compile(r"\d+,([^,]+)")
class Result(object):
def __init__(self, name, status, output, xfail_list):
self.name = name
self.status = status
self.output = output
self.is_xfailed = any((re.match(x, self.name) is not None for x in xfail_list))
def is_failure(self):
return self.get_result() in ["FAIL", "XPASS"]
def get_result(self):
if self.is_xfailed:
if self.status:
return "XFAIL"
return "XPASS"
if self.status:
return "FAIL"
return "PASS"
def get_name(self):
return self.name
def merge_in_extra_data(self, d):
"""Rather than modifying the extra data dict, return it as a no-op"""
return d
def print_data(self, max_test_len):
fmt = "{:<%d}{:}" % (max_test_len + 5)
print(fmt.format(self.get_name(), self.get_result()))
def run_with_timeout(func, args):
# We timeout after 10 minutes.
timeout_seconds = 10 * 60
# We just use this to create a timeout since we use an older python. Once
# we update to use python >= 3.3, use the timeout API on communicate
# instead.
import multiprocessing.dummy
fakeThreadPool = multiprocessing.dummy.Pool(1)
try:
result = fakeThreadPool.apply_async(func, args=args)
return result.get(timeout_seconds)
except multiprocessing.TimeoutError:
fakeThreadPool.terminate()
raise RuntimeError(
"Child process aborted due to timeout. "
"Timeout: %s seconds" % timeout_seconds
)
def _unwrap_self(args):
return type(args[0]).process_input(*args)
def get_benchmark_executable(binary_dir, opt_level):
suffix = opt_level + "-" + platform.machine() + "*"
pattern = os.path.join(binary_dir, "Benchmark_" + suffix)
executables = glob.glob(pattern)
if len(executables) == 0:
raise ValueError(
"No benchmark executable for file name pattern " +
pattern + " found")
if len(executables) > 1:
raise ValueError(
"Multiple benchmark executables for file name pattern " +
pattern + " found\n" +
str(executables))
return executables[0]
BenchmarkDriver_OptLevels = ["Onone", "O", "Osize"]
class BenchmarkDriver(object):
def __init__(
self,
binary_dir,
xfail_list,
enable_parallel=False,
opt_levels=BenchmarkDriver_OptLevels,
):
self.targets = [
(get_benchmark_executable(binary_dir, o), o) for o in opt_levels
]
self.xfail_list = xfail_list
self.enable_parallel = enable_parallel
self.data = None
def print_data_header(self, max_test_len):
fmt = "{:<%d}{:}" % (max_test_len + 5)
print(fmt.format("Name", "Result"))
def prepare_input(self, name, opt_level):
raise RuntimeError("Abstract method")
def process_input(self, data):
raise RuntimeError("Abstract method")
def run_for_opt_level(self, binary, opt_level, test_filter):
print("testing driver at path: %s" % binary)
names = []
output = subprocess.check_output([binary, "--list"], universal_newlines=True)
for line in output.split("\n")[1:]:
m = BENCHMARK_OUTPUT_RE.match(line)
if m is None:
continue
names.append(m.group(1))
if test_filter:
regex = re.compile(test_filter)
names = [n for n in names if regex.match(n)]
def prepare_input_wrapper(name):
x = {"opt": opt_level, "path": binary, "test_name": name}
x.update(self.prepare_input(name))
return x
prepared_input = [prepare_input_wrapper(n) for n in names]
results = None
if self.enable_parallel:
p = multiprocessing.Pool()
z = zip([self] * len(prepared_input), prepared_input)
results = p.map_async(_unwrap_self, z).get(999999)
else:
results = map(self.process_input, prepared_input)
def reduce_results(acc, r):
acc["result"].append(r)
acc["has_failure"] = acc["has_failure"] or r.is_failure()
acc["max_test_len"] = max(acc["max_test_len"], len(r.get_name()))
acc["extra_data"] = r.merge_in_extra_data(acc["extra_data"])
return acc
return functools.reduce(
reduce_results,
results,
{"result": [], "has_failure": False, "max_test_len": 0, "extra_data": {}},
)
def print_data(self, data, max_test_len):
print("Results:")
self.print_data_header(max_test_len)
for d in data:
for r in d["result"]:
r.print_data(max_test_len)
def run(self, test_filter=None):
self.data = [
self.run_for_opt_level(binary, opt_level, test_filter)
for binary, opt_level in self.targets
]
max_test_len = functools.reduce(max, [d["max_test_len"] for d in self.data])
has_failure = functools.reduce(max, [d["has_failure"] for d in self.data])
self.print_data(self.data, max_test_len)
return not has_failure
|