File: random_topo_test.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (397 lines) | stat: -rw-r--r-- 16,273 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
import torch
import numpy as np
import argparse

from typing import Dict

# debug print
DEBUG_PRINT = False

################################################################################
# configuration for random tests setup
################################################################################
# maximum number of tensors as inputs
MAX_TENSOR = 6
# maximum tensor rank
MAX_TENSOR_DIM = 5
# maximum tensor size
MAX_TENSOR_SIZE = 2**20
# use a size 1 tensor for debug
DEBUG_TENSOR = False
# tensor device
DEVICE = "cuda"
# data type for tensors
DTYPE = torch.float
# factor sorta control the depth of the model
GRAPH_FACTOR = 2

################################################################################
# helper functions
################################################################################


class WrongResultException(Exception):
    pass

# randomly reduce tensor_shape while preserving it to be broadcast-compatible
# two thing are done here:
#   1. trim starting dimensions;
#   2. randomly clamp remaining dimension to be size 1;


def get_broadcast_compatible_shape(tensor_shape):
    max_dim = len(tensor_shape)
    num_b_dims = np.random.randint(0, max_dim + 1)
    trim_head = np.random.randint(0, min(num_b_dims + 1, max_dim))

    shape = tensor_shape[trim_head:max_dim]
    for i in np.random.choice(range(max_dim - trim_head),
                              num_b_dims - trim_head,
                              replace=False):
        shape[i] = 1
    return shape

# generate random topology using seed and also flags


def random_topology_test(seed, *inp_tensor_list):
    np.random.seed(int(seed.numpy().tolist()))
    tensor_list = [*inp_tensor_list]
    num_tensor = len(tensor_list)

    # randomly add available constant value
    num_const = np.random.randint(0, num_tensor + 1)
    const_list = np.random.random(num_const)

    if DEBUG_PRINT:
        for const_item in const_list:
            print("----- real number {:.10f}", const_item)

    # we require all tensor to be in a single dependency set
    def get_root(x, dependency_map):
        if x in dependency_map:
            return get_root(dependency_map[x], dependency_map)
        else:
            return x
    d_map: Dict[int, int] = {}
    num_sets = num_tensor
    candidate = list(range(num_tensor))

    unary_operations = [torch.sigmoid, torch.relu]
    binary_operations = [torch.add, torch.sub, torch.mul]
    u_op_size = len(unary_operations)
    b_op_size = len(binary_operations)

    num_operations = np.random.randint(num_sets - 1,
                                       num_sets * GRAPH_FACTOR)

    ret_list = []

    while num_operations >= 0 or num_sets > 1:
        # we start off with randomly pick a candidate and operation
        index = np.random.randint(0, len(candidate))
        op_index = np.random.randint(0, u_op_size + b_op_size)
        lh_index = candidate[index]
        rh_index = None
        out_tensor = None

        if DEBUG_PRINT:
            print("iteration {0}, num_sets{1}, candidates {2}, tensor_list {3}, lh_index {4}, op_index {5}".format(
                num_operations, num_sets, candidate, len(tensor_list), lh_index, op_index))
        if num_operations >= 0:
            num_operations -= 1
            if op_index < u_op_size:
                # unary operation, we just apply a random operation on candidate
                out_tensor = unary_operations[op_index](tensor_list[lh_index])
            else:
                # binary operation, we randomly choose the other operand:
                #   1. tensor on tensor operation -> rh_index
                #   2. tensor on const operation
                # we are not restricted to candidate tensor any more.
                op_2_index = np.random.randint(0, len(tensor_list) + num_const)

                if op_2_index < len(tensor_list):
                    if op_2_index == lh_index:
                        # if we are unlucky that we picked on the candidate again, just try
                        # another tensor
                        op_2_index = (op_2_index + 1) % len(tensor_list)
                    # [if rh_index: create binary operator output tensor]
                    rh_index = op_2_index
                else:
                    left = tensor_list[lh_index]
                    right = const_list[op_2_index - len(tensor_list)]
                    # if np.random.randint(0, 2) > 0:
                    #  left = const_list[op_2_index - len(tensor_list)]
                    #  right = tensor_list[lh_index]
                    out_tensor = binary_operations[op_index - u_op_size](left, right)
                if DEBUG_PRINT:
                    print("binary, op_2_index {0}, rh_index ?{1}".format(op_2_index, rh_index))
        else:
            # binary operation, we just randomly pick two candidates.
            # this is not the most efficient way to close dependecy, as we could have
            # two candidate that are actually connected
            cand_index = np.random.randint(0, len(candidate))
            if cand_index == index:
                cand_index = (cand_index + 1) % len(candidate)
            # [if rh_index: create binary operator output tensor]
            rh_index = candidate[cand_index]
            if DEBUG_PRINT:
                print("binary rh_index ?{0}".format(rh_index))

        # update candidate should happen before we remove rh_index
        candidate[index] = len(tensor_list)
        lh_root = get_root(lh_index, d_map)
        # [if rh_index: create binary operator output tensor]
        if rh_index is not None:

            out_tensor = binary_operations[op_index - u_op_size](
                tensor_list[lh_index],
                tensor_list[rh_index])

            # remove rh_index from candidate if it is used
            if rh_index in candidate:
                # python remove(val), not by index
                candidate.remove(rh_index)

            # check if we join dependency sets:
            rh_root = get_root(rh_index, d_map)
            if lh_root != rh_root:
                num_sets -= 1
                # update dependency, no need to update d_map[rh_root] when
                # they are already pointing the same root
                d_map[rh_root] = len(tensor_list)

        # no joining, just update dependency
        d_map[lh_root] = len(tensor_list)

        # update candidate, this avoids us applying identical operation on
        # the same tensor(s)
        tensor_list.append(out_tensor)

    # TODO: we should mark
    #       union(random_sample(tensor_list[num_tensor:]), candidate) as outputs.
    #       which would ensure we have no dead branch and a connected computation
    #       graph. However, it won't work easily if we have broadcast.
    # I have disabled broadcast for now to focus on topology test.
    for ind in candidate:
        ret_list.append(tensor_list[ind])

    out_list = np.random.choice(
        range(num_tensor, len(tensor_list)),
        np.random.randint(0, len(tensor_list) - num_tensor),
        False)
    for ind in out_list:
        if ind not in candidate:
            ret_list.append(tensor_list[ind])

    if DEBUG_PRINT:
        print("ended with tensor_list: {0}".format(len(tensor_list)))

    return tuple(ret_list)


def prepareInputTensorsToRandomTopoTest(seed,
                                        max_tensor_num,
                                        max_tensor_dim,
                                        max_tensor_size,
                                        debug_tensor,
                                        device,
                                        dtype):
    # set seed to numpy as well as torch
    np.random.seed(seed)
    torch.manual_seed(np.random.randint(0, seed))

    # seed to pass to torch.jit.trace
    seed_tensor = torch.tensor(np.random.randint(0, seed))

    # random number of input tensors
    num_tensor = np.random.randint(1, max_tensor_num)

    # prepare randomized tensor shape
    tensor_dim = np.random.randint(1, max_tensor_dim)
    tensor_shape = []
    numel = 1
    if debug_tensor:
        tensor_shape.append(1)
    else:
        for i in range(tensor_dim):
            size_i = np.random.randint(1, int(max_tensor_size / numel / (2**(tensor_dim - i))))
            size_i = min(size_i, 128 + size_i % 128)
            tensor_shape.insert(0, size_i)
            numel *= size_i

    if DEBUG_PRINT:
        print("output tensor shape: ", tensor_shape)

    # vvv BROADCASTING vvv
    # select tensors to be broadcasted
    # TODO: enable broadcasting when we fully support it.
    # num_broadcasted_tensors = np.random.randint(0, num_tensor)
    num_broadcasted_tensors = np.random.randint(0, 1)
    # we leave at least one tensor not broadcasted
    broadcasted_tensors_indices = np.random.choice(torch.arange(num_tensor),
                                                   num_broadcasted_tensors,
                                                   replace=False)

    # vvv PREPARING TENSORS vvv
    tensor_list = []
    for i in range(num_tensor):
        if i in broadcasted_tensors_indices:
            # get broadcast-compatible shape:
            # Note that we are not playing with stride here, as stride doesn't affect
            # codegen meaningfully.
            compatible_shape = get_broadcast_compatible_shape(tensor_shape)
            tensor_list.append(torch.randn(compatible_shape, device=device, dtype=dtype) * 100)
        else:
            tensor_list.append(torch.randn(tensor_shape, device=device, dtype=dtype) * 100)
    return seed_tensor, tensor_list


def reproString(current_seed, args):
    repro_str = "python {0}".format(__file__)
    if args.cuda_fuser:
        repro_str += " --cuda_fuser"
    if args.legacy_fuser:
        repro_str += " --legacy_fuser"
    if args.profiling_executor:
        repro_str += " --profiling_executor"
    if args.fp16:
        repro_str += " --fp16"
    if args.cpu:
        repro_str += " --cpu"
    repro_str += " --max_num_tensor {0} --max_tensor_dim {1} --max_tensor_size {2}"\
        " --depth_factor {3} --seed {4} --repro_run".format(
            args.max_num_tensor, args.max_tensor_dim, args.max_tensor_size,
            args.depth_factor, current_seed)
    return repro_str

################################################################################
# global seed to repro the test
################################################################################


def runDefaultTestWithSeed(seed):
    # prepare input tensors
    seed_tensor, tensor_list = prepareInputTensorsToRandomTopoTest(seed,
                                                                   MAX_TENSOR,
                                                                   MAX_TENSOR_DIM,
                                                                   MAX_TENSOR_SIZE,
                                                                   DEBUG_TENSOR,
                                                                   DEVICE,
                                                                   DTYPE)
    o = random_topology_test(seed_tensor, *tensor_list)
    traced_model = torch.jit.trace(random_topology_test, (seed_tensor, *tensor_list))
    jit_o = traced_model(seed_tensor, *tensor_list)  # possible profiling run
    jit_o = traced_model(seed_tensor, *tensor_list)
    validate_o = zip(o, jit_o)
    for oo, jit_oo in validate_o:
        if not oo.allclose(jit_oo, atol=1e-5, equal_nan=True):
            return False
    return True


def runTest(seed, args):
    # prepare input tensors
    seed_tensor, tensor_list = prepareInputTensorsToRandomTopoTest(seed,
                                                                   args.max_num_tensor,
                                                                   args.max_tensor_dim,
                                                                   args.max_tensor_size,
                                                                   args.debug_tensor,
                                                                   "cuda" if not args.cpu else "cpu",
                                                                   torch.float32 if not args.fp16 else torch.float16)

    # vvv run random generated topo test in eager vvv
    try:
        if DEBUG_PRINT:
            print("seed tensor: ", seed_tensor)
        o = random_topology_test(seed_tensor, *tensor_list)
        if DEBUG_PRINT:
            for out in o:
                print("val size: ", out.size())
    except Exception as err:
        raise Exception("Testing script failure with error message, repro by running:\n"
                        f"\t{reproString(seed, args)}") from err
    try:
        traced_model = torch.jit.trace(random_topology_test, (seed_tensor, *tensor_list))
        if DEBUG_PRINT:
            print("original graph: ", traced_model.graph)
        jit_o = traced_model(seed_tensor, *tensor_list)  # possible profiling run
        jit_o = traced_model(seed_tensor, *tensor_list)
        if DEBUG_PRINT:
            print("optimized graph: ", traced_model.graph_for(seed_tensor, *tensor_list))

        validate_o = zip(o, jit_o)
        for oo, jit_oo in validate_o:
            if not oo.allclose(jit_oo, equal_nan=True):
                print("eager output: ", oo)
                print("jit output: ", jit_oo)
                print("diff ", jit_oo - oo)
                raise WrongResultException()
    except WrongResultException as err:
        raise Exception("cuda fuser gives wrong results, repro by running:\n"
                        f"\t{reproString(seed, args)}") from err
    except Exception as err:
        raise Exception("something in cuda fuser went wrong, repro by running:\n"
                        f"\t{reproString(seed, args)}") from err


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--cuda_fuser", action='store_true', default=True)
    parser.add_argument("--legacy_fuser", action='store_true', default=False)
    parser.add_argument("--profiling_executor", action='store_true', default=False)
    parser.add_argument("--fp16", action='store_true', default=False)
    parser.add_argument("--cpu", action='store_true', default=False)
    parser.add_argument("--debug_print", action='store_true', default=False)
    parser.add_argument("--debug_tensor", action='store_true', default=False)
    parser.add_argument("--max_num_tensor", default=MAX_TENSOR, type=int)
    parser.add_argument("--max_tensor_dim", default=MAX_TENSOR_DIM, type=int)
    parser.add_argument("--max_tensor_size", default=MAX_TENSOR_SIZE, type=int)
    parser.add_argument("--depth_factor", default=GRAPH_FACTOR, type=int)
    parser.add_argument("--seed", default=45589, type=int)
    group = parser.add_mutually_exclusive_group()
    group.add_argument("--iterations", default=4, type=int)
    group.add_argument("--repro_run", action='store_true', default=False)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()

    # Register CUDA fuser
    if args.cuda_fuser:
        torch._C._jit_set_nvfuser_enabled(True)

    # Turn off legacy fuser
    if not args.legacy_fuser:
        torch._C._jit_override_can_fuse_on_cpu(False)
        torch._C._jit_override_can_fuse_on_gpu(False)

    # Turn off profiling executor
    if not args.profiling_executor:
        torch._C._jit_set_profiling_executor(False)
        torch._C._get_graph_executor_optimize(False)

    # factor sorta control the depth of the model
    GRAPH_FACTOR = args.depth_factor
    # debug print
    DEBUG_PRINT = args.debug_print

    if args.repro_run:
        runTest(args.seed, args)
    else:
        np.random.seed(args.seed)
        failing_repros = []
        for seed in np.random.randint(0, args.seed, args.iterations):
            try:
                runTest(seed, args)
            except Exception as e:
                failing_repros.append(str(e))
        if len(failing_repros) == 0:
            print("test passed")
        else:
            print("{0} out of {1} tests failed;".format(
                  len(failing_repros), args.iterations))
            print("To repro failing tests, run\n")
            for repro in failing_repros:
                print(repro)