1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
#!/usr/bin/env python3
from __future__ import annotations
import datetime
import json
import os
import shutil
from pathlib import Path
from typing import Any, Callable, cast, Dict
from urllib.request import urlopen
REPO_ROOT = Path(__file__).resolve().parent.parent.parent
def get_disabled_issues() -> list[str]:
reenabled_issues = os.getenv("REENABLED_ISSUES", "")
issue_numbers = reenabled_issues.split(",")
print("Ignoring disabled issues: ", issue_numbers)
return issue_numbers
DISABLED_TESTS_FILE = ".pytorch-disabled-tests.json"
ADDITIONAL_CI_FILES_FOLDER = Path(".additional_ci_files")
TEST_TIMES_FILE = "test-times.json"
TEST_CLASS_TIMES_FILE = "test-class-times.json"
TEST_FILE_RATINGS_FILE = "test-file-ratings.json"
TEST_CLASS_RATINGS_FILE = "test-class-ratings.json"
TD_HEURISTIC_PROFILING_FILE = "td_heuristic_profiling.json"
TD_HEURISTIC_HISTORICAL_EDITED_FILES = "td_heuristic_historical_edited_files.json"
TD_HEURISTIC_PREVIOUSLY_FAILED = "previous_failures.json"
TD_HEURISTIC_PREVIOUSLY_FAILED_ADDITIONAL = "previous_failures_additional.json"
FILE_CACHE_LIFESPAN_SECONDS = datetime.timedelta(hours=3).seconds
def fetch_and_cache(
dirpath: str | Path,
name: str,
url: str,
process_fn: Callable[[dict[str, Any]], dict[str, Any]],
) -> dict[str, Any]:
"""
This fetch and cache utils allows sharing between different process.
"""
Path(dirpath).mkdir(exist_ok=True)
path = os.path.join(dirpath, name)
print(f"Downloading {url} to {path}")
def is_cached_file_valid() -> bool:
# Check if the file is new enough (see: FILE_CACHE_LIFESPAN_SECONDS). A real check
# could make a HEAD request and check/store the file's ETag
fname = Path(path)
now = datetime.datetime.now()
mtime = datetime.datetime.fromtimestamp(fname.stat().st_mtime)
diff = now - mtime
return diff.total_seconds() < FILE_CACHE_LIFESPAN_SECONDS
if os.path.exists(path) and is_cached_file_valid():
# Another test process already download the file, so don't re-do it
with open(path) as f:
return cast(Dict[str, Any], json.load(f))
for _ in range(3):
try:
contents = urlopen(url, timeout=5).read().decode("utf-8")
processed_contents = process_fn(json.loads(contents))
with open(path, "w") as f:
f.write(json.dumps(processed_contents))
return processed_contents
except Exception as e:
print(f"Could not download {url} because: {e}.")
print(f"All retries exhausted, downloading {url} failed.")
return {}
def get_test_times() -> dict[str, dict[str, float]]:
return get_from_test_infra_generated_stats(
"test-times.json",
TEST_TIMES_FILE,
"Couldn't download test times...",
)
def get_test_class_times() -> dict[str, dict[str, float]]:
return get_from_test_infra_generated_stats(
"test-class-times.json",
TEST_CLASS_TIMES_FILE,
"Couldn't download test times...",
)
def get_disabled_tests(
dirpath: str, filename: str = DISABLED_TESTS_FILE
) -> dict[str, Any] | None:
def process_disabled_test(the_response: dict[str, Any]) -> dict[str, Any]:
# remove re-enabled tests and condense even further by getting rid of pr_num
disabled_issues = get_disabled_issues()
disabled_test_from_issues = {}
for test_name, (pr_num, link, platforms) in the_response.items():
if pr_num not in disabled_issues:
disabled_test_from_issues[test_name] = (
link,
platforms,
)
return disabled_test_from_issues
try:
url = "https://ossci-metrics.s3.amazonaws.com/disabled-tests-condensed.json?versionId=PhiMB7EP3187qvpKvnORewoK3InOIvX5"
return fetch_and_cache(dirpath, filename, url, process_disabled_test)
except Exception:
print("Couldn't download test skip set, leaving all tests enabled...")
return {}
def get_test_file_ratings() -> dict[str, Any]:
return get_from_test_infra_generated_stats(
"file_test_rating.json",
TEST_FILE_RATINGS_FILE,
"Couldn't download test file ratings file, not reordering...",
)
def get_test_class_ratings() -> dict[str, Any]:
return get_from_test_infra_generated_stats(
"file_test_class_rating.json",
TEST_CLASS_RATINGS_FILE,
"Couldn't download test class ratings file, not reordering...",
)
def get_td_heuristic_historial_edited_files_json() -> dict[str, Any]:
return get_from_test_infra_generated_stats(
"td_heuristic_historical_edited_files.json",
TD_HEURISTIC_HISTORICAL_EDITED_FILES,
"Couldn't download td_heuristic_historical_edited_files.json, not reordering...",
)
def get_td_heuristic_profiling_json() -> dict[str, Any]:
return get_from_test_infra_generated_stats(
"td_heuristic_profiling.json",
TD_HEURISTIC_PROFILING_FILE,
"Couldn't download td_heuristic_profiling.json not reordering...",
)
def copy_pytest_cache() -> None:
original_path = REPO_ROOT / ".pytest_cache/v/cache/lastfailed"
if not original_path.exists():
return
shutil.copyfile(
original_path,
REPO_ROOT / ADDITIONAL_CI_FILES_FOLDER / TD_HEURISTIC_PREVIOUSLY_FAILED,
)
def copy_additional_previous_failures() -> None:
original_path = (
REPO_ROOT / ".pytest_cache" / TD_HEURISTIC_PREVIOUSLY_FAILED_ADDITIONAL
)
if not original_path.exists():
return
shutil.copyfile(
original_path,
REPO_ROOT
/ ADDITIONAL_CI_FILES_FOLDER
/ TD_HEURISTIC_PREVIOUSLY_FAILED_ADDITIONAL,
)
def get_from_test_infra_generated_stats(
from_file: str, to_file: str, failure_explanation: str
) -> dict[str, Any]:
url = f"https://raw.githubusercontent.com/pytorch/test-infra/generated-stats/stats/{from_file}"
try:
return fetch_and_cache(
REPO_ROOT / ADDITIONAL_CI_FILES_FOLDER, to_file, url, lambda x: x
)
except Exception:
print(failure_explanation)
return {}
|