1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
#!/usr/bin/env python3
"""
Test ownership was introduced in https://github.com/pytorch/pytorch/issues/66232.
This lint verifies that every Python test file (file that matches test_*.py or *_test.py in the test folder)
has valid ownership information in a comment header. Valid means:
- The format of the header follows the pattern "# Owner(s): ["list", "of owner", "labels"]
- Each owner label actually exists in PyTorch
- Each owner label starts with "module: " or "oncall: " or is in ACCEPTABLE_OWNER_LABELS
"""
from __future__ import annotations
import argparse
import json
import urllib.error
from enum import Enum
from typing import Any, NamedTuple
from urllib.request import urlopen
LINTER_CODE = "TESTOWNERS"
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str | None
line: int | None
char: int | None
code: str
severity: LintSeverity
name: str
original: str | None
replacement: str | None
description: str | None
# Team/owner labels usually start with "module: " or "oncall: ", but the following are acceptable exceptions
ACCEPTABLE_OWNER_LABELS = ["NNC", "high priority"]
OWNERS_PREFIX = "# Owner(s): "
def get_pytorch_labels() -> Any:
url = "https://ossci-metrics.s3.amazonaws.com/pytorch_labels.json"
try:
labels = urlopen(url).read().decode("utf-8")
except urllib.error.URLError:
# This is an FB-only hack, if the json isn't available we may
# need to use a forwarding proxy to get out
proxy_url = "http://fwdproxy:8080"
proxy_handler = urllib.request.ProxyHandler(
{"http": proxy_url, "https": proxy_url}
)
context = urllib.request.build_opener(proxy_handler)
labels = context.open(url).read().decode("utf-8")
return json.loads(labels)
PYTORCH_LABELS = get_pytorch_labels()
# Team/owner labels usually start with "module: " or "oncall: ", but the following are acceptable exceptions
ACCEPTABLE_OWNER_LABELS = ["NNC", "high priority"]
GLOB_EXCEPTIONS = ["**/test/run_test.py"]
def check_labels(
labels: list[str], filename: str, line_number: int
) -> list[LintMessage]:
lint_messages = []
for label in labels:
if label not in PYTORCH_LABELS:
lint_messages.append(
LintMessage(
path=filename,
line=line_number,
char=None,
code=LINTER_CODE,
severity=LintSeverity.ERROR,
name="[invalid-label]",
original=None,
replacement=None,
description=(
f"{label} is not a PyTorch label "
"(please choose from https://github.com/pytorch/pytorch/labels)"
),
)
)
if label.startswith(("module:", "oncall:")) or label in ACCEPTABLE_OWNER_LABELS:
continue
lint_messages.append(
LintMessage(
path=filename,
line=line_number,
char=None,
code=LINTER_CODE,
severity=LintSeverity.ERROR,
name="[invalid-owner]",
original=None,
replacement=None,
description=(
f"{label} is not an acceptable owner "
"(please update to another label or edit ACCEPTABLE_OWNERS_LABELS "
"in tools/linters/adapters/testowners_linter.py"
),
)
)
return lint_messages
def check_file(filename: str) -> list[LintMessage]:
lint_messages = []
has_ownership_info = False
with open(filename) as f:
for idx, line in enumerate(f):
if not line.startswith(OWNERS_PREFIX):
continue
has_ownership_info = True
labels = json.loads(line[len(OWNERS_PREFIX) :])
lint_messages.extend(check_labels(labels, filename, idx + 1))
if has_ownership_info is False:
lint_messages.append(
LintMessage(
path=filename,
line=None,
char=None,
code=LINTER_CODE,
severity=LintSeverity.ERROR,
name="[no-owner-info]",
original=None,
replacement=None,
description="Missing a comment header with ownership information.",
)
)
return lint_messages
def main() -> None:
parser = argparse.ArgumentParser(
description="test ownership linter",
fromfile_prefix_chars="@",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
lint_messages = []
for filename in args.filenames:
lint_messages.extend(check_file(filename))
for lint_message in lint_messages:
print(json.dumps(lint_message._asdict()), flush=True)
if __name__ == "__main__":
main()
|