File: lengths_top_k_ops_test.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (60 lines) | stat: -rw-r--r-- 2,371 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60





from caffe2.python import core
from hypothesis import given
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
import numpy as np


class TestLengthsTopKOps(serial.SerializedTestCase):
    @serial.given(N=st.integers(min_value=0, max_value=10),
           K=st.integers(min_value=1, max_value=10),
           **hu.gcs_cpu_only)
    def test_lengths_top_k_op(self, N, K, gc, dc):
        lens = np.random.randint(low=1, high=2 * K + 1, size=N).astype(np.int32)
        X = []
        for i in lens:
            X.extend(x / 100.0 for x in range(0, 6 * i, 6))
        X = np.array(X, dtype=np.float32)
        op = core.CreateOperator("LengthsTopK", ["X", "Y"], ["values", "indices"], k=K)

        def lengths_top_k(X, lens):
            N, si = lens.shape[0], 0
            values, indices = [], []
            for i in range(N):
                cur_indices = X[si:si + lens[i]].argsort()[-K:][::-1]
                cur_values = X[si:si + lens[i]][cur_indices]
                values.extend(cur_values)
                indices.extend(cur_indices)
                si += lens[i]
                if lens[i] < K:
                    values.extend([0] * (K - lens[i]))
                    indices.extend([-1] * (K - lens[i]))

            return (np.array(values, dtype=np.float32).reshape(-1, K),
                    np.array(indices, dtype=np.int32).reshape(-1, K))

        self.assertDeviceChecks(dc, op, [X, lens], [0, 1])
        self.assertReferenceChecks(gc, op, [X, lens], lengths_top_k)
        self.assertGradientChecks(gc, op, [X, lens], 0, [0])

    @given(N=st.integers(min_value=0, max_value=10),
           K=st.integers(min_value=1, max_value=10),
           **hu.gcs_cpu_only)
    def test_lengths_top_k_empty_op(self, N, K, gc, dc):
        lens = np.zeros((N, ), dtype=np.int32)
        X = np.array([], dtype=np.float32)
        op = core.CreateOperator("LengthsTopK", ["X", "Y"], ["values", "indices"], k=K)

        def lengths_top_k(X, lens):
            return (np.zeros((N, K), dtype=np.float32),
                    -1 * np.ones((N, K), dtype=np.int32))

        self.assertDeviceChecks(dc, op, [X, lens], [0, 1])
        self.assertReferenceChecks(gc, op, [X, lens], lengths_top_k)
        self.assertGradientChecks(gc, op, [X, lens], 0, [0])