1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
|
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
"""
A pytest plugin to record test times and then use those times to divide tests
into evenly balanced workloads for each xdist worker.
Two things are hard-coded here that shouldn't be:
- The timing data is written to the tmp directory, but should use the pytest
cache (https://docs.pytest.org/en/latest/how-to/cache.html).
- The number of xdist workers is hard-coded to 8 because I couldn't figure out
how to find the number. Would it be crazy to read the -n argument directly?
You can force some tests to run on the same worker by setting the
`balanced_clumps` setting in your pytest config file. Each line is a substring
of a test name. All tests with that substring (like -k) will run on the
worker:
balanced_clumps =
LongRunningFixture
some_other_test_substring
"""
import collections
import csv
import os
import shutil
import time
from pathlib import Path
import pytest
import xdist.scheduler
def pytest_addoption(parser):
"""Auto-called to define ini-file settings."""
parser.addini(
"balanced_clumps",
type="linelist",
help="Test substrings to assign to the same worker",
)
@pytest.hookimpl(tryfirst=True)
def pytest_configure(config):
"""Registers our pytest plugin."""
config.pluginmanager.register(BalanceXdistPlugin(config), "balance_xdist_plugin")
class BalanceXdistPlugin: # pragma: debugging
"""The plugin"""
def __init__(self, config):
self.config = config
self.running_all = (self.config.getoption("-k") == "")
self.times = collections.defaultdict(float)
self.worker = os.getenv("PYTEST_XDIST_WORKER", "none")
self.tests_csv = None
def pytest_sessionstart(self, session):
"""Called once before any tests are run, but in every worker."""
if not self.running_all:
return
tests_csv_dir = session.startpath.resolve() / "tmp/tests_csv"
self.tests_csv = tests_csv_dir / f"{self.worker}.csv"
if self.worker == "none":
if tests_csv_dir.exists():
for csv_file in tests_csv_dir.iterdir():
with csv_file.open(newline="") as fcsv:
reader = csv.reader(fcsv)
for row in reader:
self.times[row[1]] += float(row[3])
shutil.rmtree(tests_csv_dir)
def write_duration_row(self, item, phase, duration):
"""Helper to write a row to the tracked-test csv file."""
if self.running_all:
self.tests_csv.parent.mkdir(parents=True, exist_ok=True)
with self.tests_csv.open("a", newline="") as fcsv:
csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration])
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item):
"""Run once for each test."""
start = time.time()
yield
self.write_duration_row(item, "setup", time.time() - start)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(self, item):
"""Run once for each test."""
start = time.time()
yield
self.write_duration_row(item, "call", time.time() - start)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(self, item):
"""Run once for each test."""
start = time.time()
yield
self.write_duration_row(item, "teardown", time.time() - start)
@pytest.hookimpl(trylast=True)
def pytest_xdist_make_scheduler(self, config, log):
"""Create our BalancedScheduler using time data from the last run."""
# Assign tests to chunks
nchunks = 8
totals = [0] * nchunks
tests = collections.defaultdict(set)
# first put the difficult ones all in one worker
clumped = set()
clumps = config.getini("balanced_clumps")
for i, clump_word in enumerate(clumps):
clump_nodes = {nodeid for nodeid in self.times.keys() if clump_word in nodeid}
i %= nchunks
tests[i].update(clump_nodes)
totals[i] += sum(self.times[nodeid] for nodeid in clump_nodes)
clumped.update(clump_nodes)
# Then assign the rest in descending order
rest = [(nodeid, t) for (nodeid, t) in self.times.items() if nodeid not in clumped]
rest.sort(key=lambda item: item[1], reverse=True)
for nodeid, t in rest:
lightest = min(enumerate(totals), key=lambda pair: pair[1])[0]
tests[lightest].add(nodeid)
totals[lightest] += t
test_chunks = {}
for chunk_id, nodeids in tests.items():
for nodeid in nodeids:
test_chunks[nodeid] = chunk_id
return BalancedScheduler(config, log, clumps, test_chunks)
class BalancedScheduler(xdist.scheduler.LoadScopeScheduling): # pylint: disable=abstract-method # pragma: debugging
"""A balanced-chunk test scheduler for pytest-xdist."""
def __init__(self, config, log, clumps, test_chunks):
super().__init__(config, log)
self.clumps = clumps
self.test_chunks = test_chunks
def _split_scope(self, nodeid):
"""Assign a chunk id to a test node."""
# If we have a chunk assignment for this node, return it.
scope = self.test_chunks.get(nodeid)
if scope is not None:
return scope
# If this is a node that should be clumped, clump it.
for i, clump_word in enumerate(self.clumps):
if clump_word in nodeid:
return f"clump{i}"
# Otherwise every node is a separate chunk.
return nodeid
# Run this with:
# python -c "from tests.balance_xdist_plugin import show_worker_times as f; f()"
def show_worker_times(): # pragma: debugging
"""Ad-hoc utility to show data from the last tracked-test run."""
times = collections.defaultdict(float)
tests = collections.defaultdict(int)
tests_csv_dir = Path("tmp/tests_csv")
for csv_file in tests_csv_dir.iterdir():
with csv_file.open(newline="") as fcsv:
reader = csv.reader(fcsv)
for row in reader:
worker = row[0]
duration = float(row[3])
times[worker] += duration
if row[2] == "call":
tests[worker] += 1
for worker in sorted(tests.keys()):
print(f"{worker}: {tests[worker]:3d} {times[worker]:.2f}")
total = sum(times.values())
avg = total / len(times)
print(f"total: {total:.2f}, avg: {avg:.2f}")
lo = min(times.values())
hi = max(times.values())
print(f"lo = {lo:.2f}; hi = {hi:.2f}; gap = {hi - lo:.2f}; long delta = {hi - avg:.2f}")
|