1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
|
from __future__ import annotations
import argparse
import concurrent.futures
import json
import logging
import os
import sys
from enum import Enum
from typing import Any, NamedTuple
IS_WINDOWS: bool = os.name == "nt"
def eprint(*args: Any, **kwargs: Any) -> None:
print(*args, file=sys.stderr, flush=True, **kwargs)
class LintSeverity(str, Enum):
ERROR = "error"
WARNING = "warning"
ADVICE = "advice"
DISABLED = "disabled"
class LintMessage(NamedTuple):
path: str | None
line: int | None
char: int | None
code: str
severity: LintSeverity
name: str
original: str | None
replacement: str | None
description: str | None
def check_file(filename: str) -> list[LintMessage]:
with open(filename, "rb") as f:
original = f.read().decode("utf-8")
replacement = ""
with open(filename) as f:
lines = f.readlines()
for line in lines:
if len(line.strip()) > 0:
replacement += line
replacement += "\n" * 3
replacement = replacement[:-3]
if replacement == original:
return []
return [
LintMessage(
path=filename,
line=None,
char=None,
code="MERGE_CONFLICTLESS_CSV",
severity=LintSeverity.WARNING,
name="format",
original=original,
replacement=replacement,
description="Run `lintrunner -a` to apply this patch.",
)
]
def main() -> None:
parser = argparse.ArgumentParser(
description="Format csv files to have 3 lines of space between each line to prevent merge conflicts.",
fromfile_prefix_chars="@",
)
parser.add_argument(
"--verbose",
action="store_true",
help="verbose logging",
)
parser.add_argument(
"filenames",
nargs="+",
help="paths to lint",
)
args = parser.parse_args()
logging.basicConfig(
format="<%(processName)s:%(levelname)s> %(message)s",
level=logging.NOTSET
if args.verbose
else logging.DEBUG
if len(args.filenames) < 1000
else logging.INFO,
stream=sys.stderr,
)
with concurrent.futures.ProcessPoolExecutor(
max_workers=os.cpu_count(),
) as executor:
futures = {executor.submit(check_file, x): x for x in args.filenames}
for future in concurrent.futures.as_completed(futures):
try:
for lint_message in future.result():
print(json.dumps(lint_message._asdict()), flush=True)
except Exception:
logging.critical('Failed at "%s".', futures[future])
raise
if __name__ == "__main__":
main()
|