1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397
|
import torch
import numpy as np
import argparse
from typing import Dict
# debug print
DEBUG_PRINT = False
################################################################################
# configuration for random tests setup
################################################################################
# maximum number of tensors as inputs
MAX_TENSOR = 6
# maximum tensor rank
MAX_TENSOR_DIM = 5
# maximum tensor size
MAX_TENSOR_SIZE = 2**20
# use a size 1 tensor for debug
DEBUG_TENSOR = False
# tensor device
DEVICE = "cuda"
# data type for tensors
DTYPE = torch.float
# factor sorta control the depth of the model
GRAPH_FACTOR = 2
################################################################################
# helper functions
################################################################################
class WrongResultException(Exception):
pass
# randomly reduce tensor_shape while preserving it to be broadcast-compatible
# two thing are done here:
# 1. trim starting dimensions;
# 2. randomly clamp remaining dimension to be size 1;
def get_broadcast_compatible_shape(tensor_shape):
max_dim = len(tensor_shape)
num_b_dims = np.random.randint(0, max_dim + 1)
trim_head = np.random.randint(0, min(num_b_dims + 1, max_dim))
shape = tensor_shape[trim_head:max_dim]
for i in np.random.choice(range(max_dim - trim_head),
num_b_dims - trim_head,
replace=False):
shape[i] = 1
return shape
# generate random topology using seed and also flags
def random_topology_test(seed, *inp_tensor_list):
np.random.seed(int(seed.numpy().tolist()))
tensor_list = [*inp_tensor_list]
num_tensor = len(tensor_list)
# randomly add available constant value
num_const = np.random.randint(0, num_tensor + 1)
const_list = np.random.random(num_const)
if DEBUG_PRINT:
for const_item in const_list:
print("----- real number {:.10f}", const_item)
# we require all tensor to be in a single dependency set
def get_root(x, dependency_map):
if x in dependency_map:
return get_root(dependency_map[x], dependency_map)
else:
return x
d_map: Dict[int, int] = {}
num_sets = num_tensor
candidate = list(range(num_tensor))
unary_operations = [torch.sigmoid, torch.relu]
binary_operations = [torch.add, torch.sub, torch.mul]
u_op_size = len(unary_operations)
b_op_size = len(binary_operations)
num_operations = np.random.randint(num_sets - 1,
num_sets * GRAPH_FACTOR)
ret_list = []
while num_operations >= 0 or num_sets > 1:
# we start off with randomly pick a candidate and operation
index = np.random.randint(0, len(candidate))
op_index = np.random.randint(0, u_op_size + b_op_size)
lh_index = candidate[index]
rh_index = None
out_tensor = None
if DEBUG_PRINT:
print("iteration {0}, num_sets{1}, candidates {2}, tensor_list {3}, lh_index {4}, op_index {5}".format(
num_operations, num_sets, candidate, len(tensor_list), lh_index, op_index))
if num_operations >= 0:
num_operations -= 1
if op_index < u_op_size:
# unary operation, we just apply a random operation on candidate
out_tensor = unary_operations[op_index](tensor_list[lh_index])
else:
# binary operation, we randomly choose the other operand:
# 1. tensor on tensor operation -> rh_index
# 2. tensor on const operation
# we are not restricted to candidate tensor any more.
op_2_index = np.random.randint(0, len(tensor_list) + num_const)
if op_2_index < len(tensor_list):
if op_2_index == lh_index:
# if we are unlucky that we picked on the candidate again, just try
# another tensor
op_2_index = (op_2_index + 1) % len(tensor_list)
# [if rh_index: create binary operator output tensor]
rh_index = op_2_index
else:
left = tensor_list[lh_index]
right = const_list[op_2_index - len(tensor_list)]
# if np.random.randint(0, 2) > 0:
# left = const_list[op_2_index - len(tensor_list)]
# right = tensor_list[lh_index]
out_tensor = binary_operations[op_index - u_op_size](left, right)
if DEBUG_PRINT:
print("binary, op_2_index {0}, rh_index ?{1}".format(op_2_index, rh_index))
else:
# binary operation, we just randomly pick two candidates.
# this is not the most efficient way to close dependecy, as we could have
# two candidate that are actually connected
cand_index = np.random.randint(0, len(candidate))
if cand_index == index:
cand_index = (cand_index + 1) % len(candidate)
# [if rh_index: create binary operator output tensor]
rh_index = candidate[cand_index]
if DEBUG_PRINT:
print("binary rh_index ?{0}".format(rh_index))
# update candidate should happen before we remove rh_index
candidate[index] = len(tensor_list)
lh_root = get_root(lh_index, d_map)
# [if rh_index: create binary operator output tensor]
if rh_index is not None:
out_tensor = binary_operations[op_index - u_op_size](
tensor_list[lh_index],
tensor_list[rh_index])
# remove rh_index from candidate if it is used
if rh_index in candidate:
# python remove(val), not by index
candidate.remove(rh_index)
# check if we join dependency sets:
rh_root = get_root(rh_index, d_map)
if lh_root != rh_root:
num_sets -= 1
# update dependency, no need to update d_map[rh_root] when
# they are already pointing the same root
d_map[rh_root] = len(tensor_list)
# no joining, just update dependency
d_map[lh_root] = len(tensor_list)
# update candidate, this avoids us applying identical operation on
# the same tensor(s)
tensor_list.append(out_tensor)
# TODO: we should mark
# union(random_sample(tensor_list[num_tensor:]), candidate) as outputs.
# which would ensure we have no dead branch and a connected computation
# graph. However, it won't work easily if we have broadcast.
# I have disabled broadcast for now to focus on topology test.
for ind in candidate:
ret_list.append(tensor_list[ind])
out_list = np.random.choice(
range(num_tensor, len(tensor_list)),
np.random.randint(0, len(tensor_list) - num_tensor),
False)
for ind in out_list:
if ind not in candidate:
ret_list.append(tensor_list[ind])
if DEBUG_PRINT:
print("ended with tensor_list: {0}".format(len(tensor_list)))
return tuple(ret_list)
def prepareInputTensorsToRandomTopoTest(seed,
max_tensor_num,
max_tensor_dim,
max_tensor_size,
debug_tensor,
device,
dtype):
# set seed to numpy as well as torch
np.random.seed(seed)
torch.manual_seed(np.random.randint(0, seed))
# seed to pass to torch.jit.trace
seed_tensor = torch.tensor(np.random.randint(0, seed))
# random number of input tensors
num_tensor = np.random.randint(1, max_tensor_num)
# prepare randomized tensor shape
tensor_dim = np.random.randint(1, max_tensor_dim)
tensor_shape = []
numel = 1
if debug_tensor:
tensor_shape.append(1)
else:
for i in range(tensor_dim):
size_i = np.random.randint(1, int(max_tensor_size / numel / (2**(tensor_dim - i))))
size_i = min(size_i, 128 + size_i % 128)
tensor_shape.insert(0, size_i)
numel *= size_i
if DEBUG_PRINT:
print("output tensor shape: ", tensor_shape)
# vvv BROADCASTING vvv
# select tensors to be broadcasted
# TODO: enable broadcasting when we fully support it.
# num_broadcasted_tensors = np.random.randint(0, num_tensor)
num_broadcasted_tensors = np.random.randint(0, 1)
# we leave at least one tensor not broadcasted
broadcasted_tensors_indices = np.random.choice(torch.arange(num_tensor),
num_broadcasted_tensors,
replace=False)
# vvv PREPARING TENSORS vvv
tensor_list = []
for i in range(num_tensor):
if i in broadcasted_tensors_indices:
# get broadcast-compatible shape:
# Note that we are not playing with stride here, as stride doesn't affect
# codegen meaningfully.
compatible_shape = get_broadcast_compatible_shape(tensor_shape)
tensor_list.append(torch.randn(compatible_shape, device=device, dtype=dtype) * 100)
else:
tensor_list.append(torch.randn(tensor_shape, device=device, dtype=dtype) * 100)
return seed_tensor, tensor_list
def reproString(current_seed, args):
repro_str = "python {0}".format(__file__)
if args.cuda_fuser:
repro_str += " --cuda_fuser"
if args.legacy_fuser:
repro_str += " --legacy_fuser"
if args.profiling_executor:
repro_str += " --profiling_executor"
if args.fp16:
repro_str += " --fp16"
if args.cpu:
repro_str += " --cpu"
repro_str += " --max_num_tensor {0} --max_tensor_dim {1} --max_tensor_size {2}"\
" --depth_factor {3} --seed {4} --repro_run".format(
args.max_num_tensor, args.max_tensor_dim, args.max_tensor_size,
args.depth_factor, current_seed)
return repro_str
################################################################################
# global seed to repro the test
################################################################################
def runDefaultTestWithSeed(seed):
# prepare input tensors
seed_tensor, tensor_list = prepareInputTensorsToRandomTopoTest(seed,
MAX_TENSOR,
MAX_TENSOR_DIM,
MAX_TENSOR_SIZE,
DEBUG_TENSOR,
DEVICE,
DTYPE)
o = random_topology_test(seed_tensor, *tensor_list)
traced_model = torch.jit.trace(random_topology_test, (seed_tensor, *tensor_list))
jit_o = traced_model(seed_tensor, *tensor_list) # possible profiling run
jit_o = traced_model(seed_tensor, *tensor_list)
validate_o = zip(o, jit_o)
for oo, jit_oo in validate_o:
if not oo.allclose(jit_oo, atol=1e-5, equal_nan=True):
return False
return True
def runTest(seed, args):
# prepare input tensors
seed_tensor, tensor_list = prepareInputTensorsToRandomTopoTest(seed,
args.max_num_tensor,
args.max_tensor_dim,
args.max_tensor_size,
args.debug_tensor,
"cuda" if not args.cpu else "cpu",
torch.float32 if not args.fp16 else torch.float16)
# vvv run random generated topo test in eager vvv
try:
if DEBUG_PRINT:
print("seed tensor: ", seed_tensor)
o = random_topology_test(seed_tensor, *tensor_list)
if DEBUG_PRINT:
for out in o:
print("val size: ", out.size())
except Exception as err:
raise Exception("Testing script failure with error message, repro by running:\n"
f"\t{reproString(seed, args)}") from err
try:
traced_model = torch.jit.trace(random_topology_test, (seed_tensor, *tensor_list))
if DEBUG_PRINT:
print("original graph: ", traced_model.graph)
jit_o = traced_model(seed_tensor, *tensor_list) # possible profiling run
jit_o = traced_model(seed_tensor, *tensor_list)
if DEBUG_PRINT:
print("optimized graph: ", traced_model.graph_for(seed_tensor, *tensor_list))
validate_o = zip(o, jit_o)
for oo, jit_oo in validate_o:
if not oo.allclose(jit_oo, equal_nan=True):
print("eager output: ", oo)
print("jit output: ", jit_oo)
print("diff ", jit_oo - oo)
raise WrongResultException()
except WrongResultException as err:
raise Exception("cuda fuser gives wrong results, repro by running:\n"
f"\t{reproString(seed, args)}") from err
except Exception as err:
raise Exception("something in cuda fuser went wrong, repro by running:\n"
f"\t{reproString(seed, args)}") from err
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--cuda_fuser", action='store_true', default=True)
parser.add_argument("--legacy_fuser", action='store_true', default=False)
parser.add_argument("--profiling_executor", action='store_true', default=False)
parser.add_argument("--fp16", action='store_true', default=False)
parser.add_argument("--cpu", action='store_true', default=False)
parser.add_argument("--debug_print", action='store_true', default=False)
parser.add_argument("--debug_tensor", action='store_true', default=False)
parser.add_argument("--max_num_tensor", default=MAX_TENSOR, type=int)
parser.add_argument("--max_tensor_dim", default=MAX_TENSOR_DIM, type=int)
parser.add_argument("--max_tensor_size", default=MAX_TENSOR_SIZE, type=int)
parser.add_argument("--depth_factor", default=GRAPH_FACTOR, type=int)
parser.add_argument("--seed", default=45589, type=int)
group = parser.add_mutually_exclusive_group()
group.add_argument("--iterations", default=4, type=int)
group.add_argument("--repro_run", action='store_true', default=False)
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
# Register CUDA fuser
if args.cuda_fuser:
torch._C._jit_set_nvfuser_enabled(True)
# Turn off legacy fuser
if not args.legacy_fuser:
torch._C._jit_override_can_fuse_on_cpu(False)
torch._C._jit_override_can_fuse_on_gpu(False)
# Turn off profiling executor
if not args.profiling_executor:
torch._C._jit_set_profiling_executor(False)
torch._C._get_graph_executor_optimize(False)
# factor sorta control the depth of the model
GRAPH_FACTOR = args.depth_factor
# debug print
DEBUG_PRINT = args.debug_print
if args.repro_run:
runTest(args.seed, args)
else:
np.random.seed(args.seed)
failing_repros = []
for seed in np.random.randint(0, args.seed, args.iterations):
try:
runTest(seed, args)
except Exception as e:
failing_repros.append(str(e))
if len(failing_repros) == 0:
print("test passed")
else:
print("{0} out of {1} tests failed;".format(
len(failing_repros), args.iterations))
print("To repro failing tests, run\n")
for repro in failing_repros:
print(repro)
|