1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
|
import shutil
import argparse
from utils.duckdb_cli import DuckDBCLI
from utils.duckdb_installer import install_assets, install_extensions, make_cli_path, get_version
from utils.test_files_parser import load_test_files
from utils.test_report import TestReport
from utils.logger import make_logger
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock, Event
from os.path import abspath, dirname
import os
import sys
import duckdb
logger = make_logger(__name__)
cancel_event = Event()
has_checked_versions = False
parser = argparse.ArgumentParser(description='Runs the serialization BWC tests')
parser.add_argument(
'--cleanup_bwc_directory',
dest='cleanup_bwc_directory',
action='store',
help='When set, the script will clean up the BWC runtime directory. Use with `dry_run=False` argument to actually delete the files.',
default=False,
)
parser.add_argument(
'--dry_run',
dest='dry_run',
action='store',
help='When set, the cleanup of the BWC runtime directory will actually delete the files instead of just logging them.',
default=True,
)
parser.add_argument(
'--test_pattern',
dest='test_pattern',
action='store',
help='When set, only tests whose relative path contains the given pattern will be run. Example: `--test_pattern=window` will only run tests that have "window" in their path.',
default=None,
)
parser.add_argument(
'--stop_on_failure',
dest='stop_on_failure',
action='store',
help='When set, the test runner will stop executing further tests after the first failure. This can be useful when debugging a specific test or when you want to quickly identify if there are any issues without running the entire suite.',
default=False,
)
parser.add_argument(
'--run_sequentially',
dest='run_sequentially',
action='store',
help='When set, the test runner will execute tests sequentially instead of in parallel. This can be useful for debugging or when running a subset of tests to get more detailed logs.',
default=False,
)
parser.add_argument(
'--old_duckdb_version',
dest='old_duckdb_version',
action='store',
help='The old DuckDB version to test against. If not set, tests will be run for all supported versions.',
default=None,
)
parser.add_argument(
'--max_workers',
dest='max_workers',
action='store',
help='The maximum number of worker threads to use when running tests in parallel. Default is 24.',
type=int,
default=24,
)
parser.add_argument(
'--new_cli_path',
dest='new_cli_path',
action='store',
help='Path to the new DuckDB CLI to test. If not set, it will default to "build/release/duckdb" in the repo.',
default=None,
)
parser.add_argument(
'--test_file',
dest='test_file',
action='store',
help='Path to a test file to run',
default=None,
)
args = parser.parse_args()
class TestRunnerContext:
def __init__(self, old_duckdb_version, new_cli_path, nb_tests, summary_file, report_con):
self.old_cli_path = make_cli_path(old_duckdb_version)
self.new_cli_path = new_cli_path
self.nb_tests = nb_tests
self.summary_file = summary_file
self.report_con = report_con
self.report_lock = Lock()
def initialize(self):
self.old_cli_v = '-'.join(get_version(self.old_cli_path))
logger.info(f"Old CLI path: {self.old_cli_path} @ {self.old_cli_v}")
self.new_cli_v = '-'.join(get_version(self.new_cli_path))
logger.info(f"New CLI path: {self.new_cli_path} @ {self.new_cli_v}")
def run_multi_clis(clis, c):
results = []
for cli in clis:
r = cli.execute_command(c)
results.append(r)
return results
TRANSIENT_ERROR_PATTERNS = [
"Could not set lock on file",
"HTTP",
"Connection",
"Unable to connect",
]
def is_transient_error(output):
error = output.get("error") or ""
exception = output.get("exception_message") or ""
combined = error + exception
return len(combined) == 0 or any(pattern in combined for pattern in TRANSIENT_ERROR_PATTERNS)
def run_step(cli, report, func_name, *args):
str_args = ", ".join([f"'{arg}'" for arg in args])
command = f"CALL {func_name}({str_args});"
output = cli.execute_command(command)
# Retry on transient errors (file locks, network issues)
if not output["success"] and is_transient_error(output):
for attempt in range(1, 4):
logger.warning(f"[{cli.version}] Transient error, retrying in {attempt}s (attempt {attempt}/3)")
time.sleep(attempt)
output = cli.execute_command(command)
if output["success"] or not is_transient_error(output):
break
# We had to name the functions `tu_compare_results_from_xxx`
# in DuckDB 1.1.x to avoid collision and support overloading.
step_name = "compare_results" if "compare_results" in func_name else func_name
return report.end_step(step_name, output)
def get_extensions_versions(clis):
results = run_multi_clis(
clis, "SELECT extension_version FROM duckdb_extensions() WHERE extension_name='test_utils';"
)
versions = []
for r in results:
if not r["success"]:
raise RuntimeError(f"Failed to get extension version: {r['error']}")
# Find the actual version value, skipping the CSV header and any stale output
version = None
for line in r["output"]:
if line and line != "extension_version":
version = line
break
if not version:
raise RuntimeError(
f"test-utils extension not loaded for CLI '{clis[len(versions)].version}' - output: {r['output']}"
)
versions.append(version)
return versions
def do_run_test(ctx, test_spec):
global has_checked_versions
logger.info(f"Running test {test_spec.test_idx}/{ctx.nb_tests}: {test_spec.test_spec_relative_path}")
report = TestReport(test_spec, ctx.old_cli_v, ctx.new_cli_v)
sanity_checks = False
with DuckDBCLI(ctx.old_cli_path, unsigned=True) as old_cli:
with DuckDBCLI(ctx.new_cli_path, unsigned=True) as new_cli:
ext_path = 'test_utils'
run_multi_clis([old_cli, new_cli], f".cd {test_spec.test_runtime_directory}\nLOAD '{ext_path}'")
logger.debug(f"Loaded extension from '{ext_path}'")
if sanity_checks:
run_multi_clis([old_cli, new_cli], "pragma version;")
run_multi_clis(
[old_cli, new_cli],
"SELECT extension_name, loaded, extension_version FROM duckdb_extensions() WHERE extension_name='test_utils';",
)
if not has_checked_versions:
versions = get_extensions_versions([old_cli, new_cli])
if versions[0] != versions[1]:
# make release EXTENSION_CONFIGS=.github/config/extensions/test-utils.cmake to compile in this repo
raise RuntimeError(
f"test-utils extension version mismatch: {old_cli.version} is @ {versions[0]} and {new_cli.version} is @ {versions[1]}"
)
has_checked_versions = True
# Cleanup output directory in case previous runs left files there
test_spec.reset_output_directory()
# First serialize the queries & run them in the first version
compare_func = "tu_compare_results_from_memory"
compare_args = [test_spec.results_new_file_name]
if test_spec.has_cached_serialized_plans:
compare_args.append(test_spec.results_old_file_name) # Load old results for comparison
compare_func = "tu_compare_results_from_file"
report.end_cached_step(
"serialize_queries_plans", test_spec.queries_file_name, test_spec.serialized_plans_file_name
)
report.end_cached_step("serialize_results", test_spec.results_old_file_name)
else:
if not run_step(
old_cli,
report,
"serialize_queries_plans",
test_spec.queries_file_name,
test_spec.serialized_plans_file_name,
):
return report
# TODO - merge results in serialized file?
if not run_step(old_cli, report, "serialize_results", test_spec.results_old_file_name):
return report
# We need to provide a clean output folder for the other CLI
test_spec.reset_output_directory()
# Then execute the plans in the second version
if not run_step(
new_cli,
report,
"execute_all_plans_from_file",
test_spec.serialized_plans_file_name,
test_spec.results_new_file_name,
):
return report
# Now compare the results
run_step(old_cli, report, compare_func, *compare_args)
report.load_comparison_results()
return report
def write_summary_file(ctx, to_write):
with ctx.report_lock:
ctx.summary_file.write(to_write)
ctx.summary_file.flush()
def run_one_test_and_log(ctx, test):
if cancel_event.is_set():
return None
try:
report = do_run_test(ctx, test)
if report.is_successful():
write_summary_file(ctx, f"{test.test_spec_relative_path}\n")
logger.info(f"Test {test.test_idx}/{ctx.nb_tests}: {test.test_spec_relative_path} - SUCCESS")
else:
em = report.find_exception_message()
write_summary_file(ctx, f"{test.test_spec_relative_path} # FAILED: {em}\n")
logger.error(f"Test {test.test_idx}/{ctx.nb_tests}: {test.test_spec_relative_path} - ❌ FAILED: {em}")
with ctx.report_lock:
ctx.report_con.execute(TestReport.report_insert(), report.report_sql_values())
ctx.report_con.commit()
return report
except Exception as e:
cancel_event.set()
raise
def cleanup_runtime_dir(bwc_tests_base_dir, dry_run=True):
runtime_dir = f"{bwc_tests_base_dir}/runtime"
logger.info(f"Cleaning up BWC directory '{runtime_dir}'")
delete_list = []
# Remove new result files, any 'output' dir, and 'data' & 'test' symlinks
for root, dirs, files in os.walk(runtime_dir):
for file in files:
file_path = os.path.join(root, file)
if file.endswith(".new.result.bin"):
delete_list.append(file_path)
for dir in dirs:
dir_path = os.path.join(root, dir)
if dir.startswith("output") or (os.path.islink(dir_path) and (dir == "data" or dir == "test")):
delete_list.append(dir_path)
if dry_run:
logger.info(
f"{len(delete_list)} files would be deleted - re-run with `--dry_run=False` argument to actually delete them"
)
for file_path in delete_list:
logger.debug(f" {file_path}")
else:
removed = 0
for file_path in delete_list:
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.remove(file_path)
removed += 1
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
removed += 1
except Exception as e:
logger.error(f"Failed to delete '{file_path}': {e}")
logger.info(f"Cleanup completed, deleted {removed}/{len(delete_list)} files/directories")
if __name__ == "__main__":
supported_duckdb_versions = (
[args.old_duckdb_version]
if args.old_duckdb_version
else [
"v1.1.0",
"v1.1.2",
"v1.1.1",
"v1.2.0",
"v1.1.3",
"v1.2.2",
"v1.2.1",
"v1.3.0",
"v1.3.1",
"v1.3.2",
"v1.4.0",
"v1.4.1",
"v1.4.2",
"v1.4.3",
"v1.4.4",
]
)
duckdb_root_dir = dirname(dirname(dirname(abspath(__file__))))
logger.info(f"DuckDB root dir: '{duckdb_root_dir}'")
bwc_tests_base_dir = f"{duckdb_root_dir}/duckdb_unittest_tempdir/bwc"
if args.cleanup_bwc_directory:
cleanup_runtime_dir(bwc_tests_base_dir, dry_run=args.dry_run)
sys.exit(0)
# Create the report database
os.makedirs(f"{bwc_tests_base_dir}/reports", exist_ok=True)
ts = time.strftime("%Y%m%d_%H%M%S")
version_suffix = args.old_duckdb_version if args.old_duckdb_version else "multi_version"
report_db_path = f"{bwc_tests_base_dir}/reports/test_report_{version_suffix}_{ts}.duckdb"
con = duckdb.connect(report_db_path)
con.execute(TestReport.report_schema())
con.commit()
logger.info(f"Report database created at '{report_db_path}'")
# Install DuckDB CLIs and test suites for all supported versions
with ThreadPoolExecutor(max_workers=10) as executor:
list(executor.map(lambda version: install_assets(version, bwc_tests_base_dir), supported_duckdb_versions))
nb_tests_run = 0
nb_success = 0
failed_tests = []
for old_duckdb_version in supported_duckdb_versions:
logger.info(f"Running tests for DuckDB version '{old_duckdb_version}'")
new_cli_path = f"{duckdb_root_dir}/build/release/duckdb" if args.new_cli_path is None else args.new_cli_path
summary_file_path = f"{bwc_tests_base_dir}/tests_summary_{old_duckdb_version}.txt"
with open(summary_file_path, 'a') as summary_file:
summary_file.write(f"\n## --- Starting run at {time.strftime('%Y-%m-%d %H:%M:%S')} ---\n")
summary_file.flush()
test_pattern = args.test_pattern
single_test_file = args.test_file
res = load_test_files(
duckdb_root_dir, bwc_tests_base_dir, old_duckdb_version, test_pattern, single_test_file
)
tests = res['tests']
runner_context = TestRunnerContext(old_duckdb_version, new_cli_path, len(tests), summary_file, con)
runner_context.initialize()
extensions = res['needed_extensions']
extensions.add('test_utils')
install_extensions(runner_context.old_cli_path, extensions)
install_extensions(runner_context.new_cli_path, extensions)
run_sequentially = args.run_sequentially or test_pattern is not None
stop_on_failure = args.stop_on_failure
start_time = time.time()
nb_tests_run = 0
if run_sequentially:
for test in tests:
report = run_one_test_and_log(runner_context, test)
nb_tests_run += 1
if report.is_successful():
nb_success += 1
else:
failed_tests.append((old_duckdb_version, report.test_relative_path))
if stop_on_failure:
break
elif test_pattern is not None:
report.log_errors()
report.log_steps_queries()
else:
with ThreadPoolExecutor(max_workers=args.max_workers) as executor:
reports = executor.map(lambda test: run_one_test_and_log(runner_context, test), tests)
reports_list = list(reports)
nb_tests_run = len(reports_list)
nb_success = sum(1 for r in reports_list if r.is_successful())
for r in reports_list:
if not r.is_successful():
failed_tests.append((old_duckdb_version, r.test_relative_path))
elapsed = time.time() - start_time
tps = nb_tests_run / elapsed if elapsed > 0 else 0
nb_failed = nb_tests_run - nb_success
logger.info(
f"All tests completed in {elapsed:.2f}s - {nb_tests_run} tests run, {nb_success} successful, {nb_failed} failed - {tps:.2f} tests per second."
)
if len(failed_tests) == 0:
logger.info("All tests passed successfully! 🎉")
sys.exit(0)
last_version = None
for version, test_path in failed_tests:
if version != last_version:
logger.info(f"Failed tests for DuckDB version '{version}':")
logger.info(f"{'-'*40}")
last_version = version
logger.info(test_path)
sys.exit(1)
|