File: ruff.py

package info (click to toggle)
firefox 146.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 4,653,260 kB
  • sloc: cpp: 7,587,892; javascript: 6,509,455; ansic: 3,755,295; python: 1,410,813; xml: 629,201; asm: 438,677; java: 186,096; sh: 62,697; makefile: 18,086; objc: 13,087; perl: 12,811; yacc: 4,583; cs: 3,846; pascal: 3,448; lex: 1,720; ruby: 1,003; php: 436; lisp: 258; awk: 247; sql: 66; sed: 54; csh: 10; exp: 6
file content (123 lines) | stat: -rw-r--r-- 3,673 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import json
import os
import platform
import re
import signal
import subprocess
import sys

from mozlint import result

here = os.path.abspath(os.path.dirname(__file__))


def default_bindir():
    # We use sys.prefix to find executables as that gets modified with
    # virtualenv's activate_this.py, whereas sys.executable doesn't.
    if platform.system() == "Windows":
        return os.path.join(sys.prefix, "Scripts")
    else:
        return os.path.join(sys.prefix, "bin")


def get_ruff_version(binary):
    """
    Returns found binary's version
    """
    try:
        output = subprocess.check_output(
            [binary, "--version"],
            stderr=subprocess.STDOUT,
            text=True,
        )
    except subprocess.CalledProcessError as e:
        output = e.output

    matches = re.match(r"ruff ([0-9\.]+)", output)
    if matches:
        return matches[1]
    print(f"Error: Could not parse the version '{output}'")


def run_process(config, cmd, **kwargs):
    orig = signal.signal(signal.SIGINT, signal.SIG_IGN)
    proc = subprocess.Popen(
        cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True
    )
    signal.signal(signal.SIGINT, orig)
    try:
        output, _ = proc.communicate()
        proc.wait()
    except KeyboardInterrupt:
        proc.kill()

    return output


def lint(paths, config, log, **lintargs):
    fixed = 0
    results = []

    if not paths:
        return {"results": results, "fixed": fixed}

    args = ["ruff", "check", "--force-exclude"] + paths

    if config.get("exclude"):
        args.append(f"--extend-exclude={','.join(config['exclude'])}")

    process_kwargs = {"processStderrLine": lambda line: log.debug(line)}

    warning_rules = set(config.get("warning-rules", []))
    if lintargs.get("fix"):
        # Do a first pass with --fix-only as the json format doesn't return the
        # number of fixed issues.
        fix_args = args + ["--fix-only"]
        if not lintargs.get("warning"):
            # Don't fix warnings to limit unrelated changes sneaking into patches.
            # except when  --fix -W  is passed
            fix_args.append(f"--extend-ignore={','.join(warning_rules)}")

        log.debug(f"Running --fix: {fix_args}")
        output = run_process(config, fix_args, **process_kwargs)
        matches = re.match(r"Fixed (\d+) errors?.", output)
        if matches:
            fixed = int(matches[1])
    args += ["--output-format=json"]
    log.debug(f"Running with args: {args}")

    output = run_process(config, args, **process_kwargs)
    if not output:
        return []

    try:
        issues = json.loads(output)
    except json.JSONDecodeError:
        log.error(f"could not parse output: {output}")
        return []

    for issue in issues:
        res = {
            "path": issue["filename"],
            "lineno": issue["location"]["row"],
            "column": issue["location"]["column"],
            "lineoffset": issue["end_location"]["row"] - issue["location"]["row"],
            "message": issue["message"],
            "rule": issue["code"],
            "level": "error",
        }
        if issue["code"] is not None and any(
            issue["code"].startswith(w) for w in warning_rules
        ):
            res["level"] = "warning"

        if issue["fix"]:
            res["hint"] = issue["fix"]["message"]

        results.append(result.from_config(config, **res))

    return {"results": results, "fixed": fixed}