File: generate_small_sample.py

package info (click to toggle)
python-executing 2.2.0-0.3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 11,860 kB
  • sloc: python: 10,235; sh: 48; makefile: 10
file content (202 lines) | stat: -rw-r--r-- 5,795 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from .test_main import TestFiles
from pathlib import Path
import hashlib

from pysource_minimize import minimize
import sys
import textwrap
import os
import linecache
from executing import Source
from multiprocessing import get_context
import tempfile
import hashlib
import time
import contextlib
import os
from rich.progress import Progress, track
from rich.syntax import Syntax
from rich.console import Console
import argparse
import ast

last_samples_dir = Path(__file__).parent / "last_samples"
last_samples_dir.mkdir(exist_ok=True)


small_samples = Path(__file__).parent / "small_samples"


def source_hash(source_code):
    return hashlib.sha256(source_code.encode("utf8")).hexdigest()


def big_samples(folder):
    yield from last_samples_dir.rglob("*.py")

    hashes = set()

    for p in folder.rglob("*.py"):
        try:
            content = p.read_text()
        except:
            continue

        if content.count("\n") > 50000:
            # Long files take too much time to check and are most likely generated code or repetitive
            continue

        h = source_hash(content)
        if h in hashes:
            continue
        hashes.add(h)
        yield p


def test_file(filename: Path):
    code = filename.read_text()

    # Clear caches to avoid accumulating too much data in memory.
    # This is usually not a problem for executing, but this usage scenario is different
    linecache.clearcache()
    for cache_name in ("__source_cache_with_lines", "__executing_cache"):
        if hasattr(Source, cache_name):
            delattr(Source, cache_name)

    test = TestFiles()
    try:
        ast.parse(code)
    except (RecursionError,SyntaxError):
        return True

    try:
        with open(os.devnull, "w") as dev_null:
            with contextlib.redirect_stderr(dev_null):
                with contextlib.redirect_stdout(dev_null):
                    test.check_filename(filename, check_names=True)
    except:
        return False

    return True


def map_file(filename: Path):
    return test_file(filename), filename


def main():

    parser = argparse.ArgumentParser(prog="generate_small_samples")
    parser.add_argument("source_folder", default="tests/samples", nargs="?")

    args = parser.parse_args()

    folder = Path(args.source_folder)

    if not (folder.exists() and folder.is_dir()):
        print("source_folder has to be an existing directory")
        exit(1)

    console = Console()

    end_time = time.time() + 60 * 60

    console.print()
    console.print(f"Check files in tests/last_samples and {folder}:")
    console.print()

    with Progress() as progress:

        task_collect = progress.add_task(description="collect files ...", total=None)

        with get_context("spawn").Pool(maxtasksperchild=100) as p:
            files = list(
                progress.track(
                    big_samples(folder),
                    task_id=task_collect,
                    description="collect files...",
                )
            )
            progress.reset(task_collect, description="check files...", total=len(files))

            for result, filename in progress.track(
                p.imap_unordered(map_file, files), task_id=task_collect
            ):

                break_file = Path(__file__).parent / "break_generate"
                if break_file.exists():
                    break_file.unlink()
                    sys.exit(0)


                if not result:
                    print(f"{filename} is failing the tests -> minimize\n")
                    failing_code = filename.read_text()
                    break
            else:
                progress.stop()
                console.print()
                console.print(
                    f"  :fireworks: checked {len(files)} files and everything was ok :fireworks:"
                )
                console.print()
                return

            p.terminate()

        (last_samples_dir / f"{source_hash(failing_code)}.py").write_text(failing_code)

        def check_for_error(source: str):
            with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
                tmp_file.write(source.encode("utf8"))
                tmp_file.flush()
                test_ok = test_file(Path(tmp_file.name))
                return not test_ok

        task_minimize = progress.add_task("minimize...")

        def update(current, total):
            progress.update(task_minimize, completed=total - current, total=total)

        min_code = minimize(failing_code, check_for_error, progress_callback=update)

    name = f"{source_hash(min_code)}.py"

    mutmut_header = os.environ.get("MUTMUT_HEADER", None)
    header = ""
    if mutmut_header != None:
        header = (
            "This sample was generated for the following code mutation detected by mutmut:\n\n"
            + mutmut_header
        )

        header = textwrap.indent(header, "# ", lambda _: True) + "\n"
        name = f"{source_hash(header)}.py"

    min_code = header + min_code

    result_location = small_samples / name
    result_location.write_text(min_code)

    console.print()
    console.print("This is the minimal example to reproduce the bug:")
    console.print(Syntax.from_path(result_location, line_numbers=True))
    console.print()

    console.print(f"The example was saved under:\n  [blue]{result_location}")
    console.print()

    console.print("This example is now part of the test and can be run with:")
    console.print(
        f" > tox -e py{sys.version_info.major}{sys.version_info.minor} -- -k {name[:10]}"
    )
    console.print()

    console.print(
        "Have fun debugging :smiley: and dont forget to run me again,"
        " if you think you fixed everything."
    )


if __name__ == "__main__":
    main()