1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
|
import collections
import caffe2.python.hypothesis_test_util as hu
import hypothesis.strategies as st
import numpy as np
from caffe2.python import core, dyndep, workspace
from caffe2.quantization.server import utils as dnnlowp_utils
from caffe2.quantization.server.dnnlowp_test_utils import (
avoid_vpmaddubsw_overflow_fc,
check_quantized_results_close,
run_conv_or_fc,
)
from hypothesis import given
dyndep.InitOpsLibrary("//caffe2/caffe2/quantization/server:dnnlowp_ops")
workspace.GlobalInit(["caffe2", "--caffe2_omp_num_threads=11"])
class DNNLowPFullyConnectedOpTest(hu.HypothesisTestCase):
# correctness test with no quantization error in inputs
@given(
input_channels=st.sampled_from([3, 4, 5, 8, 16, 32]),
output_channels=st.integers(2, 16),
batch_size=st.integers(0, 16),
in_quantized=st.booleans(),
out_quantized=st.booleans(),
weight_quantized=st.booleans(),
prepack_weight=st.booleans(),
preserve_activation_sparsity=st.booleans(),
preserve_weight_sparsity=st.booleans(),
fuse_relu=st.booleans(),
output_packed_bias=st.booleans(),
use_input_qparam=st.booleans(),
use_output_qparam=st.booleans(),
**hu.gcs_cpu_only
)
def test_dnnlowp_fully_connected_int(
self,
input_channels,
output_channels,
batch_size,
in_quantized,
out_quantized,
weight_quantized,
prepack_weight,
preserve_activation_sparsity,
preserve_weight_sparsity,
fuse_relu,
output_packed_bias,
use_input_qparam,
use_output_qparam,
gc,
dc,
):
# X and W have scale 1, so exactly represented after quantization
X_min = 0 if preserve_activation_sparsity else -77
X_max = X_min + 255
X = np.round(
np.random.rand(batch_size, input_channels) * (X_max - X_min) + X_min
)
X = X.astype(np.float32)
# input channels 0 and 1 are all X_min to avoid overflow from vpmaddubsw
# when multiplied with W_min and W_max
X[:, 0] = X_min
if batch_size != 0:
X[0, 1] = X_max
if preserve_weight_sparsity:
W_min = -128
W_max = 100
else:
W_min = -100
W_max = W_min + 255
W = np.round(
np.random.rand(output_channels, input_channels) * (W_max - W_min) + W_min
)
W = W.astype(np.float32)
W[0, 0] = W_min
W[1, 0] = W_max
# Make sure we won't have overflows from vpmaddubsw instruction used in
# fbgemm
avoid_vpmaddubsw_overflow_fc(
batch_size,
input_channels,
output_channels,
X,
X_min,
X_max,
W,
W_min,
W_max,
)
b = np.random.randn(output_channels).astype(np.float32)
Output = collections.namedtuple("Output", ["Y", "op_type", "engine"])
outputs = []
op_engine_list = [("FC", "", False, False)]
if fuse_relu:
op_engine_list += [("Int8FCRelu", "DNNLOWP", False, False)]
else:
op_engine_list += [
# type, engine, do_fuse, skip_requantization
("FC", "DNNLOWP", False, False),
("FC", "DNNLOWP_16", False, False),
("Int8FC", "DNNLOWP", False, False),
("Int8FC", "DNNLOWP", True, False),
("Int8FC", "DNNLOWP", False, True),
("Int8FC", "DNNLOWP", True, True),
]
for op_type, engine, do_fuse, skip_requantization in op_engine_list:
init_net = core.Net("test_init_net")
net = core.Net("test_net")
do_quantize = "DNNLOWP" in engine and in_quantized and not do_fuse
do_dequantize = "DNNLOWP" in engine and out_quantized and not skip_requantization
do_quantize_weight = (
engine == "DNNLOWP" and weight_quantized and len(outputs) > 0
)
do_prepack_weight = engine == "DNNLOWP" and prepack_weight
if do_quantize:
quantize = core.CreateOperator(
"Quantize",
["X"],
["X_q"],
preserve_activation_sparsity=preserve_activation_sparsity,
engine=engine,
device_option=gc,
)
net.Proto().op.extend([quantize])
X_min = 0 if X.size == 0 else X.min()
X_max = 0 if X.size == 0 else X.max()
x_q_param = dnnlowp_utils.choose_quantization_params(
X_min, X_max, preserve_activation_sparsity
)
w_q_param = None
if do_quantize_weight:
(
int8_given_tensor_fill,
w_q_param,
) = dnnlowp_utils.create_int8_given_tensor_fill(
W, "W_q", preserve_weight_sparsity
)
init_net.Proto().op.extend([int8_given_tensor_fill])
# Bias
int8_bias_tensor_fill = dnnlowp_utils.create_int8_bias_tensor_fill(
b, "b_q", x_q_param, w_q_param
)
init_net.Proto().op.extend([int8_bias_tensor_fill])
if do_prepack_weight:
inputs = ["W_q" if do_quantize_weight else "W"]
if do_dequantize:
inputs += ["b_q" if do_quantize_weight else "b"]
pack = core.CreateOperator(
"Int8FCPackWeight",
inputs,
["W_packed", "B_q32"]
if do_dequantize and output_packed_bias
else ["W_packed"],
preserve_weight_sparsity=preserve_weight_sparsity,
in_scale=x_q_param.scale,
engine=engine,
)
init_net.Proto().op.extend([pack])
fc = core.CreateOperator(
op_type,
[
"X_q" if do_quantize else "X",
"W_packed"
if do_prepack_weight
else ("W_q" if do_quantize_weight else "W"),
"b_q" if do_quantize_weight else "b",
# "quant_param",
],
["Y_q" if do_dequantize else "Y"],
dequantize_output=not do_dequantize,
preserve_activation_sparsity=preserve_activation_sparsity,
preserve_weight_sparsity=preserve_weight_sparsity,
engine=engine,
device_option=gc,
)
if op_type != "FC":
if (do_dequantize and use_output_qparam) or (use_input_qparam and op_type == "Int8_FC"):
fc.input.extend(["quant_param"])
if (use_input_qparam and op_type == "Int8_FC"):
fc.input.extend(["X_quant_param"])
if do_quantize_weight or do_prepack_weight:
# When quantized weight is provided, we can't rescale the
# output dynamically by looking at the range of output of each
# batch, so here we provide the range of output observed from
# fp32 reference implementation
dnnlowp_utils.add_quantization_param_args(
fc, outputs[0][0], preserve_activation_sparsity
)
net.Proto().op.extend([fc])
if fuse_relu and "DNNLOWP" not in engine:
net.Relu(["Y"], "Y")
if do_dequantize:
dequantize = core.CreateOperator(
"Dequantize", ["Y_q"], ["Y"], engine=engine, device_option=gc
)
net.Proto().op.extend([dequantize])
if use_output_qparam and do_dequantize and op_type != "FC":
ref_output = outputs[0][0]
ref_output_min = 0 if ref_output.size == 0 else ref_output.min()
ref_output_max = 0 if ref_output.size == 0 else ref_output.max()
q_param = dnnlowp_utils.choose_quantization_params(
ref_output_min, ref_output_max, preserve_activation_sparsity
)
q_param_scale = q_param.scale
q_param_zero_point = q_param.zero_point
else:
q_param_scale = None
q_param_zero_point = None
if not (use_input_qparam and op_type == "Int8FC"):
x_q_param_scale = None
x_q_param_zero_point = None
else:
x_q_param_scale = x_q_param.scale
x_q_param_zero_point = x_q_param.zero_point
run_conv_or_fc(
self,
init_net,
net,
X,
W,
b,
op_type,
engine,
None,
gc,
outputs,
q_param_scale,
q_param_zero_point,
x_q_param_scale,
x_q_param_zero_point,
)
if output_packed_bias and do_prepack_weight and do_dequantize:
bias_int32 = self.ws.blobs["B_q32"].fetch()
if do_quantize_weight:
np.testing.assert_equal(
bias_int32[0], np.round(b / (x_q_param.scale * w_q_param.scale))
)
np.testing.assert_equal(bias_int32[0].dtype, np.int32)
shapes, types = workspace.InferShapesAndTypes(
[init_net, net],
blob_dimensions={
"X": [batch_size, input_channels],
"W": [output_channels, input_channels],
"b": [output_channels],
"quant_param": [1],
"X_quant_param": [1],
},
blob_types={
"X": core.DataType.FLOAT,
"W": core.DataType.FLOAT,
"b": core.DataType.FLOAT,
"quant_param": core.DataType.FLOAT,
"X_quant_param": core.DataType.FLOAT,
},
)
assert (
"Y" in shapes and "Y" in types
), "Failed to infer the shape or type of Y"
self.assertEqual(shapes["Y"], [batch_size, output_channels])
self.assertEqual(types["Y"], core.DataType.FLOAT)
check_quantized_results_close(outputs, symmetric=preserve_activation_sparsity)
|