1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
|
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import os
import platform
import re
import signal
import subprocess
import sys
from mozlint import result
from mozlint.pathutils import expand_exclusions
here = os.path.abspath(os.path.dirname(__file__))
def default_bindir():
# We use sys.prefix to find executables as that gets modified with
# virtualenv's activate_this.py, whereas sys.executable doesn't.
if platform.system() == "Windows":
return os.path.join(sys.prefix, "Scripts")
else:
return os.path.join(sys.prefix, "bin")
def get_black_version(binary):
"""
Returns found binary's version
"""
try:
output = subprocess.check_output(
[binary, "--version"],
stderr=subprocess.STDOUT,
universal_newlines=True,
)
except subprocess.CalledProcessError as e:
output = e.output
try:
# Accept `black.EXE, version ...` on Windows.
# for old version of black, the output is
# black, version 21.4b2
# From black 21.11b1, the output is like
# black, 21.11b1 (compiled: no)
return re.match(r"black.*,( version)? (\S+)", output)[2]
except TypeError as e:
print(f"Could not parse the version '{output}'")
print(f"Error: {e}")
def parse_issues(config, output, paths, *, log):
would_reformat = re.compile("^would reformat (.*)$", re.I)
reformatted = re.compile("^reformatted (.*)$", re.I)
cannot_reformat = re.compile("^error: cannot format (.*?): (.*)$", re.I)
results = []
for l in output.split(b"\n"):
line = l.decode("utf-8").rstrip("\r\n")
if line.startswith("All done!") or line.startswith("Oh no!"):
break
match = would_reformat.match(line)
if match:
res = {"path": match.group(1), "level": "error"}
results.append(result.from_config(config, **res))
continue
match = reformatted.match(line)
if match:
res = {"path": match.group(1), "level": "warning", "message": "reformatted"}
results.append(result.from_config(config, **res))
continue
match = cannot_reformat.match(line)
if match:
res = {"path": match.group(1), "level": "error", "message": match.group(2)}
results.append(result.from_config(config, **res))
continue
log.debug(f"Unhandled line: {line}")
return results
def run_process(config, cmd):
orig = signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
signal.signal(signal.SIGINT, orig)
try:
output, _ = proc.communicate()
proc.wait()
except KeyboardInterrupt:
proc.kill()
return output
def run_black(config, paths, fix=None, *, log, virtualenv_bin_path):
fixed = 0
binary = os.path.join(virtualenv_bin_path or default_bindir(), "black")
log.debug(f"Black version {get_black_version(binary)}")
cmd_args = [binary]
if not fix:
cmd_args.append("--check")
if platform.system() == "Windows":
MAX_PATHS_PER_JOB = (
50 # set a max size to prevent command lines that are too long on Windows
)
chunked_paths = [
paths[i : i + MAX_PATHS_PER_JOB]
for i in range(0, len(paths), MAX_PATHS_PER_JOB)
]
else:
chunked_paths = [paths]
all_output = []
for paths_chunk in chunked_paths:
base_command = cmd_args + paths_chunk
log.debug("Command: {}".format(" ".join(base_command)))
output = parse_issues(
config, run_process(config, base_command), paths_chunk, log=log
)
all_output.extend(output)
# black returns an issue for fixed files as well
for eachIssue in output:
if eachIssue.message == "reformatted":
fixed += 1
return {"results": all_output, "fixed": fixed}
def lint(paths, config, fix=None, **lintargs):
files = list(expand_exclusions(paths, config, lintargs["root"]))
return run_black(
config,
files,
fix=fix,
log=lintargs["log"],
virtualenv_bin_path=lintargs.get("virtualenv_bin_path"),
)
|