File: graph.py

package info (click to toggle)
python-hypothesis 6.148.2-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 15,408 kB
  • sloc: python: 63,888; ruby: 1,107; sh: 266; makefile: 42; javascript: 6
file content (131 lines) | stat: -rw-r--r-- 4,028 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# This file is part of Hypothesis, which may be found at
# https://github.com/HypothesisWorks/hypothesis/
#
# Copyright the Hypothesis Authors.
# Individual contributors are listed in AUTHORS.rst and the git log.
#
# This Source Code Form is subject to the terms of the Mozilla Public License,
# v. 2.0. If a copy of the MPL was not distributed with this file, You can
# obtain one at https://mozilla.org/MPL/2.0/.

import json
import math
import statistics
from pathlib import Path

import click


def plot_vega(vega_spec, data, *, to, parameters=None):
    import vl_convert

    parameters = parameters or {}

    spec = json.loads(vega_spec.read_text())
    spec["data"].insert(0, {"name": "source", "values": data})
    if "signals" not in spec:
        spec["signals"] = []

    for key, value in parameters.items():
        spec["signals"].append({"name": key, "value": value})

    with open(to, "wb") as f:
        # default ppi is 72, which is somewhat blurry.
        f.write(vl_convert.vega_to_png(spec, ppi=200))


def _mean_difference_ci(n1, n2, *, confidence):
    from scipy import stats

    var1 = statistics.variance(n1)
    var2 = statistics.variance(n2)
    df = len(n1) + len(n2) - 2
    # this assumes equal variances between the populations of n1 and n2. This
    # is not necessarily true (new might be more consistent than old), but it's
    # good enough.
    pooled_std = math.sqrt(((len(n1) - 1) * var1 + (len(n2) - 1) * var2) / df)
    se = pooled_std * math.sqrt(1 / len(n1) + 1 / len(n2))
    t_crit = stats.t.ppf((1 + confidence) / 2, df)
    return t_crit * se


def _process_benchmark_data(data):
    assert set(data) == {"old", "new"}
    old_calls = data["old"]["calls"]
    new_calls = data["new"]["calls"]
    assert set(old_calls) == set(new_calls), set(old_calls).symmetric_difference(
        set(new_calls)
    )

    graph_data = []

    def _diff_times(old, new):
        if old == 0 and new == 0:
            return 0
        if old == 0:
            # there aren't any great options here, but 0 is more reasonable than inf.
            return 0
        v = (old - new) / old
        if 0 < v < 1:
            v = (1 / (1 - v)) - 1
        return v

    sums = {"old": 0, "new": 0}
    for node_id in old_calls:
        old = old_calls[node_id]
        new = new_calls[node_id]
        if (
            set(old) | set(new) == {0}
            or len(old) != len(new)
            or len(old) == len(new) == 0
        ):
            print(f"skipping {node_id}")
            continue

        sums["old"] += statistics.mean(old)
        sums["new"] += statistics.mean(new)
        diffs = [n_old - n_new for n_old, n_new in zip(old, new, strict=True)]
        diffs_times = [
            _diff_times(n_old, n_new) for n_old, n_new in zip(old, new, strict=True)
        ]
        ci_shrink = (
            _mean_difference_ci(old, new, confidence=0.95) if len(old) > 1 else 0
        )

        graph_data.append(
            {
                "node_id": node_id,
                "absolute": statistics.mean(diffs),
                "absolute_ci_lower": ci_shrink,
                "absolute_ci_upper": ci_shrink,
                "nx": statistics.mean(diffs_times),
                "nx_ci_lower": 0,
                "nx_ci_upper": 0,
            }
        )

    graph_data = sorted(graph_data, key=lambda d: d["absolute"])
    return graph_data, sums


@click.command()
@click.argument("data", type=click.Path(exists=True, path_type=Path))
@click.argument("out", type=click.Path(path_type=Path))
def plot(data, out):
    data = json.loads(data.read_text())
    data, sums = _process_benchmark_data(data)
    plot_vega(
        Path(__file__).parent / "spec.json",
        data=data,
        to=out,
        parameters={
            "title": "Shrinking benchmark (calls)",
            "sum_old": sums["old"],
            "sum_new": sums["new"],
            "absolute_axis_title": ("shrink call change (old - new, larger is good)"),
        },
    )


if __name__ == "__main__":
    plot()