File: check_c_api_usage.py

package info (click to toggle)
numpy 1%3A2.4.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 87,160 kB
  • sloc: python: 259,644; asm: 232,483; ansic: 213,962; cpp: 160,235; f90: 1,585; sh: 785; fortran: 567; makefile: 443; sed: 139; xml: 109; java: 97; perl: 82; cs: 62; javascript: 53; objc: 33; lex: 13; yacc: 9
file content (265 lines) | stat: -rw-r--r-- 8,593 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
#!/usr/bin/env python3
from __future__ import annotations

import argparse
import os
import re
import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from re import Pattern

"""
Borrow-ref C API linter (Python version).

- Recursively scans source files under --root (default: numpy)
- Matches suspicious CPython C-API calls as whole identifiers
- Skips:
  - lines with '// noqa: borrowed-ref OK' or
    '// noqa: borrowed-ref - manual fix needed'
  - line comments (// ...)
  - block comments (/* ... */), even when they span lines
- Prints findings and exits 1 if any issues found, else 0
"""

def strip_comments(line: str, in_block: bool) -> tuple[str, bool]:
    """
    Return (code_without_comments, updated_in_block).
    Removes // line comments and /* ... */ block comments (non-nesting, C-style).
    """
    i = 0
    out_parts: list[str] = []
    n = len(line)

    while i < n:
        if in_block:
            end = line.find("*/", i)
            if end == -1:
                # Entire remainder is inside a block comment.
                return ("".join(out_parts), True)
            i = end + 2
            in_block = False
            continue

        # Not in block: look for next // or /* from current i
        sl = line.find("//", i)
        bl = line.find("/*", i)

        if sl != -1 and (bl == -1 or sl < bl):
            # Line comment starts first: take code up to '//' and stop
            out_parts.append(line[i:sl])
            return ("".join(out_parts), in_block)

        if bl != -1:
            # Block comment starts: take code up to '/*', then enter block
            out_parts.append(line[i:bl])
            i = bl + 2
            in_block = True
            continue

        # No more comments
        out_parts.append(line[i:])
        break

    return ("".join(out_parts), in_block)

def iter_source_files(root: Path, exts: set[str], excludes: set[str]) -> list[Path]:
    """
    Return a list of source files under 'root', where filenames end with any of the
    extensions in 'exts' (e.g., '.c.src', '.c', '.h').
    Excludes directories whose names are in 'excludes'.
    """
    results: list[Path] = []

    for dirpath, dirnames, filenames in os.walk(root):
        # Prune excluded directories
        dirnames[:] = [d for d in dirnames if d not in excludes]
        for fn in filenames:
            # endswith handles mult-suffice patterns, e.g., .c.src
            if any(fn.endswith(ext) for ext in exts):
                results.append(Path(dirpath) / fn)
    return results

def build_func_rx(funcs: tuple[str, ...]) -> Pattern[str]:
    return re.compile(r"(?<!\w)(?:" + "|".join(map(re.escape, funcs)) + r")(?!\w)")

def scan_file(
        path: Path,
        func_rx: Pattern[str],
        noqa_markers: tuple[str, ...]
        ) -> list[tuple[str, int, str, str]]:
    """
    Scan a single file.
    Returns list of (func_name, line_number, path_str, raw_line_str).
    """
    hits: list[tuple[str, int, str, str]] = []
    in_block = False
    noqa_set = set(noqa_markers)

    try:
        with path.open("r", encoding="utf-8", errors="ignore") as f:
            for lineno, raw in enumerate(f, 1):
                # Skip if approved by noqa markers
                if any(mark in raw for mark in noqa_set):
                    continue

                # Remove comments; if nothing remains, skip
                code, in_block = strip_comments(raw.rstrip("\n"), in_block)
                if not code.strip():
                    continue

                # Find all suspicious calls in non-comment code
                for m in func_rx.finditer(code):
                    hits.append((m.group(0), lineno, str(path), raw.rstrip("\n")))
    except FileNotFoundError:
        # File may have disappeared; ignore gracefully
        pass
    return hits


def main(argv: list[str] | None = None) -> int:
    # List of suspicious function calls:
    suspicious_funcs: tuple[str, ...] = (
        "PyList_GetItem",
        "PyDict_GetItem",
        "PyDict_GetItemWithError",
        "PyDict_GetItemString",
        "PyDict_SetDefault",
        "PyDict_Next",
        "PyWeakref_GetObject",
        "PyWeakref_GET_OBJECT",
        "PyList_GET_ITEM",
        "_PyDict_GetItemStringWithError",
        "PySequence_Fast"
    )
    func_rx = build_func_rx(suspicious_funcs)
    noqa_markers = (
        "noqa: borrowed-ref OK",
        "noqa: borrowed-ref - manual fix needed"
        )
    default_exts = {".c", ".h", ".c.src", ".cpp"}
    default_excludes = {"pythoncapi-compat"}

    ap = argparse.ArgumentParser(description="Borrow-ref C API linter (Python).")
    ap.add_argument(
        "--quiet",
        action="store_true",
        help="Suppress normal output; exit status alone indicates result (useful\
              for CI).",
    )
    ap.add_argument(
        "-j", "--jobs",
        type=int,
        default=0,
        help="Number of worker threads (0=auto, 1=sequential).",
    )
    ap.add_argument(
        "--root",
        default="numpy",
        type=str,
        help="Root directory to scan (default: numpy)"
        )
    ap.add_argument(
        "--ext",
        action="append",
        default=None,
        help=f"File extension(s) to include (repeatable). Defaults to {default_exts}",
    )
    ap.add_argument(
        "--exclude",
        action="append",
        default=None,
        help=f"Directory name(s) to exclude (repeatable). Default: {default_excludes}",
    )
    args = ap.parse_args(argv)

    if args.ext:
        exts = {e if e.startswith(".") else f".{e}" for e in args.ext}
    else:
        exts = set(default_exts)
    excludes = set(args.exclude) if args.exclude else set(default_excludes)

    root = Path(args.root)
    if not root.exists():
        print(f"error: root '{root}' does not exist", file=sys.stderr)
        return 2

    files = sorted(iter_source_files(root, exts, excludes), key=str)

    # Determine concurrency: auto picks a reasonable cap for I/O-bound work
    if args.jobs is None or args.jobs <= 0:
        max_workers = min(32, (os.cpu_count() or 1) * 5)
    else:
        max_workers = max(1, args.jobs)
    print(f'Scanning {len(files)} C/C++ source files...\n')

    # Output file (mirrors your shell behavior)
    tmpdir = Path(".tmp")
    tmpdir.mkdir(exist_ok=True)

    findings = 0

    # Run the scanning in parallel; only the main thread writes the report
    all_hits: list[tuple[str, int, str, str]] = []
    if max_workers == 1:
        for p in files:
            all_hits.extend(scan_file(p, func_rx, noqa_markers))
    else:
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            fut_to_file = {ex.submit(scan_file, p, func_rx, noqa_markers):
                           p for p in files}
            for fut in as_completed(fut_to_file):
                try:
                    all_hits.extend(fut.result())
                except Exception as e:
                    print(f'Failed to scan {fut_to_file[fut]}: {e}')

    # Sort for deterministic output: by path, then line number
    all_hits.sort(key=lambda t: (t[2], t[1]))

    # There no hits, linter passed
    if not all_hits:
        if not args.quiet:
            print("All checks passed! C API borrow-ref linter found no issues.\n")
        return 0

    # There are some linter failures: create a log file
    with tempfile.NamedTemporaryFile(
        prefix="c_api_usage_report.",
        suffix=".txt",
        dir=tmpdir,
        mode="w+",
        encoding="utf-8",
        delete=False,
        ) as out:
        report_path = Path(out.name)
        out.write("Running Suspicious C API usage report workflow...\n\n")
        for func, lineo, pstr, raw in all_hits:
            findings += 1
            out.write(f"Found suspicious call to {func} in file: {pstr}\n")
            out.write(f" -> {pstr}:{lineo}a:{raw}\n")
            out.write("Recommendation:\n")
            out.write(
                "If this use is intentional and safe, add "
                "'// noqa: borrowed-ref OK' on the same line "
                "to silence this warning.\n"
            )
            out.write(
                "Otherwise, consider replacing the call "
                "with a thread-safe API function.\n\n"
            )

        out.flush()
        if not args.quiet:
            out.seek(0)
            sys.stdout.write(out.read())
            print(f"Report written to: {report_path}\n\n\
C API borrow-ref linter FAILED.")

    return 1


if __name__ == "__main__":

    sys.exit(main())