File: flexible_top_k_test.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (77 lines) | stat: -rw-r--r-- 2,609 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77





from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial

from collections import OrderedDict
from hypothesis import given, settings
import numpy as np


class TestFlexibleTopK(serial.SerializedTestCase):
    def flexible_top_k_ref(self, X, k):
        X_flat = X.reshape((-1, X.shape[-1]))
        indices_ref = np.ndarray(shape=sum(k), dtype=np.int32)
        values_ref = np.ndarray(shape=sum(k), dtype=np.float32)
        offset = 0
        for i in range(X_flat.shape[0]):
            od = OrderedDict()
            for j in range(X_flat.shape[1]):
                val = X_flat[i, j]
                if val not in od:
                    od[val] = []
                od[val].append(j)
            k_ = 0
            for val, idxs in sorted(od.items(), reverse=True):
                for idx in idxs:
                    indices_ref[offset + k_] = idx
                    values_ref[offset + k_] = val
                    k_ += 1
                    if k_ >= k[i]:
                        break
                if k_ >= k[i]:
                    break
            offset += k[i]

        return (values_ref, indices_ref)

    @given(X=hu.tensor(min_dim=2), **hu.gcs_cpu_only)
    @settings(deadline=10000)
    def test_flexible_top_k(self, X, gc, dc):
        X = X.astype(dtype=np.float32)
        k_shape = (int(X.size / X.shape[-1]), )
        k = np.random.randint(1, high=X.shape[-1] + 1, size=k_shape)

        output_list = ["Values", "Indices"]
        op = core.CreateOperator("FlexibleTopK", ["X", "k"], output_list,
                                 device_option=gc)

        def bind_ref(X_loc, k):
            ret = self.flexible_top_k_ref(X_loc, k)
            return ret

        self.assertReferenceChecks(gc, op, [X, k], bind_ref)

    @given(X=hu.tensor(min_dim=2), **hu.gcs_cpu_only)
    @settings(deadline=10000)
    def test_flexible_top_k_grad(self, X, gc, dc):
        X = X.astype(np.float32)
        k_shape = (int(X.size / X.shape[-1]), )
        k = np.random.randint(1, high=X.shape[-1] + 1, size=k_shape)

        # this try to make sure adding stepsize (0.05)
        # will not change TopK selections at all
        # since dims max_value = 5 as defined in
        # caffe2/caffe2/python/hypothesis_test_util.py
        for i in range(X.shape[-1]):
            X[..., i] = i * 1.0 / X.shape[-1]

        op = core.CreateOperator(
            "FlexibleTopK", ["X", "k"], ["Values", "Indices"], device_option=gc
        )

        self.assertGradientChecks(gc, op, [X, k], 0, [0])