File: bisect_percentile_op_test.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (179 lines) | stat: -rw-r--r-- 6,290 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
from typing import List

import hypothesis.strategies as st

from caffe2.python import core, workspace
from hypothesis import given
import caffe2.python.hypothesis_test_util as hu

import bisect
import numpy as np


class TestBisectPercentileOp(hu.HypothesisTestCase):
    def compare_reference(
            self,
            raw_data,
            pct_raw_data,
            pct_mapping,
            pct_upper,
            pct_lower,
            lengths,
    ):
        def bisect_percentile_op_ref(
            raw_data,
            pct_raw_data,
            pct_mapping,
            pct_lower,
            pct_upper,
            lengths
        ):
            results = np.zeros_like(raw_data)
            indices = [0]
            for j in range(len(lengths)):
                indices.append(indices[j] + lengths[j])
            for i in range(len(raw_data)):
                for j in range(len(raw_data[0])):
                    start = indices[j]
                    end = indices[j + 1]
                    val = raw_data[i][j]
                    pct_raw_data_i = pct_raw_data[start:end]
                    pct_lower_i = pct_lower[start:end]
                    pct_upper_i = pct_upper[start:end]
                    pct_mapping_i = pct_mapping[start:end]

                    # Corner cases
                    if val < pct_raw_data_i[0]:
                        results[i][j] = 0
                        continue
                    if val > pct_raw_data_i[-1]:
                        results[i][j] = 1.
                        continue

                    # interpolation
                    k = bisect.bisect_left(pct_raw_data_i, val)
                    if pct_raw_data_i[k] == val:
                        results[i][j] = pct_mapping_i[k]
                    else:
                        k = k - 1
                        slope = ((pct_lower_i[k + 1] - pct_upper_i[k])
                            / (pct_raw_data_i[k + 1] - pct_raw_data_i[k]))
                        results[i][j] = pct_upper_i[k] + \
                            slope * (val - pct_raw_data_i[k])

            return results

        workspace.ResetWorkspace()
        workspace.FeedBlob("raw_data", raw_data)

        op = core.CreateOperator(
            "BisectPercentile",
            ["raw_data"],
            ["pct_output"],
            percentile_raw=pct_raw_data,
            percentile_mapping=pct_mapping,
            percentile_lower=pct_lower,
            percentile_upper=pct_upper,
            lengths=lengths
        )
        workspace.RunOperatorOnce(op)

        expected_output = bisect_percentile_op_ref(
            raw_data,
            pct_raw_data,
            pct_mapping,
            pct_lower,
            pct_upper,
            lengths
        )
        output = workspace.blobs['pct_output']
        np.testing.assert_array_almost_equal(output, expected_output)

    def test_bisect_percentil_op_simple(self):
        raw_data = np.array([
            [1, 1],
            [2, 2],
            [3, 3],
            [3, 1],
            [9, 10],
            [1.5, 5],
            [1.32, 2.4],
            [2.9, 5.7],
            [-1, -1],
            [3, 7]
        ], dtype=np.float32)
        pct_raw_data = np.array([1, 2, 3, 2, 7], dtype=np.float32)
        pct_lower = np.array([0.1, 0.2, 0.9, 0.1, 0.5], dtype=np.float32)
        pct_upper = np.array([0.1, 0.8, 1.0, 0.4, 1.0], dtype=np.float32)
        pct_mapping = np.array([0.1, 0.5, 0.95, 0.25, 0.75], dtype=np.float32)
        lengths = np.array([3, 2], dtype=np.int32)
        self.compare_reference(
            raw_data, pct_raw_data, pct_mapping, pct_lower, pct_upper, lengths)

    @given(
        N=st.integers(min_value=20, max_value=100),
        lengths_in=st.lists(
            elements=st.integers(min_value=2, max_value=10),
            min_size=2,
            max_size=5,
        ),
        max_value=st.integers(min_value=100, max_value=1000),
        discrete=st.booleans(),
        p=st.floats(min_value=0, max_value=0.9),
        **hu.gcs_cpu_only
    )
    def test_bisect_percentil_op_large(
        self, N: int, lengths_in: List[int], max_value: int, discrete: bool, p: float, gc, dc
    ):
        lengths = np.array(lengths_in, dtype=np.int32)
        D = len(lengths)

        if discrete:
            raw_data = np.random.randint(0, max_value, size=(N, D))
        else:
            raw_data = np.random.randn(N, D)

        # To generate valid pct_lower and pct_upper
        pct_lower = []
        pct_upper = []
        pct_raw_data = []
        for i in range(D):
            pct_lower_val = 0.
            pct_upper_val = 0.
            pct_lower_cur = []
            pct_upper_cur = []
            # There is no duplicated values in pct_raw_data
            if discrete:
                pct_raw_data_cur = np.random.choice(
                    np.arange(max_value), size=lengths[i], replace=False)
            else:
                pct_raw_data_cur = np.random.randn(lengths[i])
                while len(set(pct_raw_data_cur)) < lengths[i]:
                    pct_raw_data_cur = np.random.randn(lengths[i])
            pct_raw_data_cur = np.sort(pct_raw_data_cur)
            for _ in range(lengths[i]):
                pct_lower_val = pct_upper_val + 0.01
                pct_lower_cur.append(pct_lower_val)
                pct_upper_val = pct_lower_val + \
                    0.01 * np.random.randint(1, 20) * (np.random.uniform() < p)
                pct_upper_cur.append(pct_upper_val)
            # normalization
            pct_lower_cur = np.array(pct_lower_cur, np.float32) / pct_upper_val
            pct_upper_cur = np.array(pct_upper_cur, np.float32) / pct_upper_val
            pct_lower.extend(pct_lower_cur)
            pct_upper.extend(pct_upper_cur)
            pct_raw_data.extend(pct_raw_data_cur)

        pct_lower = np.array(pct_lower, dtype=np.float32)
        pct_upper = np.array(pct_upper, dtype=np.float32)
        pct_mapping = (pct_lower + pct_upper) / 2.
        raw_data = np.array(raw_data, dtype=np.float32)
        pct_raw_data = np.array(pct_raw_data, dtype=np.float32)

        self.compare_reference(
            raw_data, pct_raw_data, pct_mapping, pct_lower, pct_upper, lengths)


if __name__ == "__main__":
    import unittest
    unittest.main()