File: weighted_sample_test.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (80 lines) | stat: -rw-r--r-- 2,739 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80





import numpy as np

from hypothesis import given
import hypothesis.strategies as st

from caffe2.python import core
from caffe2.python import workspace
import caffe2.python.hypothesis_test_util as hu


class TestWeightedSample(hu.HypothesisTestCase):
    @given(
        batch=st.integers(min_value=0, max_value=128),
        weights_len=st.integers(min_value=0, max_value=128),
        **hu.gcs
    )
    def test_weighted_sample(self, batch, weights_len, gc, dc):

        weights = np.zeros((batch, weights_len))
        values = np.zeros((batch, weights_len))
        rand_indices = []
        rand_values = []
        if batch > 0 and weights_len > 0:
            for i in range(batch):
                rand_tmp = np.random.randint(0, weights_len)
                rand_val = np.random.rand()
                rand_indices.append(rand_tmp)
                rand_values.append(rand_val)
                weights[i, rand_tmp] = 1.0
                values[i, rand_tmp] = rand_val

        rand_indices = np.array(rand_indices, dtype=np.float32)
        rand_values = np.array(rand_values, dtype=np.float32)
        workspace.FeedBlob("weights", weights.astype(np.float32))
        workspace.FeedBlob("values", values.astype(np.float32))

        # output both indices and values
        op = core.CreateOperator(
            "WeightedSample", ["weights", "values"],
            ["sample_indices", "sample_values"]
        )
        workspace.RunOperatorOnce(op)
        result_indices = workspace.FetchBlob("sample_indices")
        result_values = workspace.FetchBlob("sample_values")
        if batch > 0 and weights_len > 0:
            for i in range(batch):
                np.testing.assert_allclose(rand_indices[i], result_indices[i])
                np.testing.assert_allclose(rand_values[i], result_values[i])
        else:
            np.testing.assert_allclose(rand_indices, result_indices)
            np.testing.assert_allclose(rand_values, result_values)
        self.assertDeviceChecks(
            dc,
            op,
            [weights.astype(np.float32), values.astype(np.float32)],
            [0, 1]
        )

        # output indices only
        op2 = core.CreateOperator(
            "WeightedSample", ["weights"], ["sample_indices_2"]
        )
        workspace.RunOperatorOnce(op2)
        result = workspace.FetchBlob("sample_indices_2")
        if batch > 0 and weights_len > 0:
            for i in range(batch):
                np.testing.assert_allclose(rand_indices[i], result[i])
        else:
            np.testing.assert_allclose(rand_indices, result)
        self.assertDeviceChecks(dc, op2, [weights.astype(np.float32)], [0])


if __name__ == "__main__":
    import unittest
    unittest.main()