1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
from caffe2.python import core
import caffe2.python.hypothesis_test_util as hu
import caffe2.python.serialized_test.serialized_test_util as serial
import hypothesis.strategies as st
from hypothesis import given, settings
import numpy as np
class TestClipTensorByScalingOp(serial.SerializedTestCase):
@given(n=st.integers(5, 8), d=st.integers(2, 4),
threshold=st.floats(0.1, 10),
additional_threshold=st.floats(0.1, 10),
use_additional_threshold=st.booleans(),
inplace=st.booleans(),
**hu.gcs_cpu_only)
@settings(deadline=10000)
def test_clip_tensor_by_scaling(self, n, d, threshold, additional_threshold,
use_additional_threshold, inplace, gc, dc):
tensor = np.random.rand(n, d).astype(np.float32)
val = np.array(np.linalg.norm(tensor))
additional_threshold = np.array([additional_threshold]).astype(np.float32)
def clip_tensor_by_scaling_ref(tensor_data, val_data,
additional_threshold=None):
if additional_threshold is not None:
final_threshold = threshold * additional_threshold
else:
final_threshold = threshold
if val_data > final_threshold:
ratio = final_threshold / float(val_data)
tensor_data = tensor_data * ratio
return [tensor_data]
op = core.CreateOperator(
"ClipTensorByScaling",
["tensor", "val"] if not use_additional_threshold else (
["tensor", "val", "additional_threshold"]),
['Y'] if not inplace else ["tensor"],
threshold=threshold,
)
self.assertReferenceChecks(
device_option=gc,
op=op,
inputs=[tensor, val] if not use_additional_threshold else (
[tensor, val, additional_threshold]),
reference=clip_tensor_by_scaling_ref,
)
if __name__ == "__main__":
import unittest
unittest.main()
|