File: compare_ddp.py

package info (click to toggle)
pytorch 1.13.1%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 139,252 kB
  • sloc: cpp: 1,100,274; python: 706,454; ansic: 83,052; asm: 7,618; java: 3,273; sh: 2,841; javascript: 612; makefile: 323; xml: 269; ruby: 185; yacc: 144; objc: 68; lex: 44
file content (256 lines) | stat: -rw-r--r-- 9,873 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
"""
A simple tool to compare the performance of different impls of
DistributedDataParallel on resnet50, three flavors:

1. DistributedDataParallel, which has a python wrapper and C++ core to do
   gradient distribution and reduction. It's current production version.

2. PythonDDP with async gradient reduction.

3. PythonDDP with synchrous gradient reduction.

Example::
    >>> modify configs in main func
    >>> python compare_ddp.py
    >>> Sample out: compare_ddp_sample.md
"""

import numpy as np
import os
import pickle
import glob
import python_ddp
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models

from collections import OrderedDict
from enum import Enum
from tabulate import tabulate
from torch.nn.parallel import DistributedDataParallel as DDP

class DDPOption(Enum):
    DDP_CPP_CORE = 1
    PYTHON_DDP_SYNC_REDUCTION = 2
    PYTHON_DDP_ASYNC_REDUCTION = 3

class LatencyData:
    __slots__ = ["buffer_size_in_M", "ddp_option", "rank", "metrics"]

    def __init__(self, buffer_size_in_M, ddp_option, rank, metrics):
        self.buffer_size_in_M = buffer_size_in_M
        self.ddp_option = ddp_option
        self.rank = rank
        self.metrics = metrics

def serialize(buffer_size_in_M, ddp_option, rank, metrics,
              data_dir="./tmp", ext="ddpraw"):
    if not os.path.exists(data_dir):
        print(f'{data_dir} not exist, mkdir {data_dir}')
        os.mkdir(data_dir)
    file_name = "buffer_size_{}M_rank{}_{}.{}".format(
        buffer_size_in_M, rank, ddp_option, ext)
    file_path = os.path.join(data_dir, file_name)
    print("Writing metrics to file: '{}'".format(file_path))
    data = LatencyData(buffer_size_in_M, ddp_option, rank, metrics)
    with open(file_path, "wb") as f:
        pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
    print(f"Wrote metrics to '{file_path}''")

def load_detailed_metrics(data_dir="./tmp", ext="ddpraw"):
    assert os.path.exists(data_dir)
    file_pattern = os.path.join(data_dir, f"*.{ext}")
    files = glob.glob(file_pattern)
    print("load_detailed_metrics found {} files".format(len(files)))
    buffer_size_to_metrics = OrderedDict()
    for file_path in files:
        with open(file_path, "rb") as f:
            data = pickle.load(f)
        # Add data to buffer_size_to_metrics
        buffer_size = data.buffer_size_in_M
        if buffer_size not in buffer_size_to_metrics:
            buffer_size_to_metrics[buffer_size] = {}
        metrics = buffer_size_to_metrics.get(buffer_size)
        assert metrics is not None
        metrics[data.ddp_option] = data.metrics
    return buffer_size_to_metrics

def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'

    # initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def create_ddp_model(module, rank, pg, ddp_option, buffer_size_in_M):
    """Helper to create DDPModel. """
    if ddp_option == DDPOption.DDP_CPP_CORE:
        ddp_model = DDP(module, device_ids=[rank],
                        process_group=pg,
                        bucket_cap_mb=buffer_size_in_M)
        ddp_model._set_static_graph()
        return ddp_model
    elif ddp_option == DDPOption.PYTHON_DDP_SYNC_REDUCTION:
        M = 2 ** 20
        return python_ddp.PythonDDP(module, pg, False, buffer_size=buffer_size_in_M * M)
    elif ddp_option == DDPOption.PYTHON_DDP_ASYNC_REDUCTION:
        M = 2 ** 20
        return python_ddp.PythonDDP(module, pg, True, buffer_size=buffer_size_in_M * M)
    else:
        raise NotImplementedError

def run_ddp(rank, world_size, epochs, ddp_option, buffer_size_in_M, warmup_iterations=20):
    print(f'Invoked run_ddp rank {rank}')
    assert epochs > warmup_iterations

    # Setup
    print("setting up ... ")
    setup(rank, world_size)
    torch.manual_seed(rank)
    torch.cuda.manual_seed(rank)
    device = torch.device('cuda:%d' % rank)
    print('setup done')

    # Create ResNet50 module and wrap in DDP module.
    pg = dist.distributed_c10d._get_default_group()
    model = models.resnet50().to(device)
    ddp_model = create_ddp_model(model, rank, pg, ddp_option, buffer_size_in_M)
    assert ddp_model is not None

    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # Container to hold: event -> list of events in milliseconds
    MODEL_FORWARD = "forward"
    MODEL_BACKWARD = "backward"
    metrics = {MODEL_FORWARD: [], MODEL_BACKWARD: []}

    for epoch in range(epochs):
        if epoch % 10 == 0:
            print(f'Epoch {epoch}/{epochs} ...')

        start = torch.cuda.Event(enable_timing=True)
        end = torch.cuda.Event(enable_timing=True)

        # TODO(bowangbj): Switch to real training set from ImageNet.
        inputs = torch.rand([32, 3, 224, 224], device=device)
        labels = torch.rand([32, 1000], device=device)

        # Forward
        start.record()
        outputs = ddp_model(inputs)
        loss = loss_fn(outputs, labels)

        end.record()
        torch.cuda.synchronize()
        if epoch >= warmup_iterations:
            metrics[MODEL_FORWARD].append(start.elapsed_time(end))

        # Backward
        start.record()
        loss.backward()
        # Reduce all grad, this is needed for non-DDP_CPP_CORE since the hook
        # for all_reduce does not exist yet.
        if ddp_option != DDPOption.DDP_CPP_CORE:
            ddp_model.all_reduce_grads()
        end.record()
        torch.cuda.synchronize()
        if epoch >= warmup_iterations:
            metrics[MODEL_BACKWARD].append(start.elapsed_time(end))

        # Optimization
        optimizer.step()
        optimizer.zero_grad()

    if rank == 0:
        print(f"\nMetrics for GPU {rank}, ddp_option={ddp_option}, buffer_size={buffer_size_in_M}M")
        print(f"Skipped {warmup_iterations} CUDA warmpup iterations. ")
        for step, elapsed_milliseconds in metrics.items():
            A = np.array(elapsed_milliseconds)
            print(' {N} iterations, {step}, mean={mean} ms, median={median} ms, p90={p90} ms, p99={p99} ms'.format(
                N=len(A), step=step, mean=np.mean(A),
                median=np.percentile(A, 50), p90=np.percentile(A, 90),
                p99=np.percentile(A, 99)))

        # Serialize the raw data to be used to compute summary. Didn't choose to
        # maintain a global object holding the metrics b/c mp.spawn tries to
        # fork all the arguments before spawning new process thus it's infeasible
        # save global states in an object.
        serialize(buffer_size_in_M, ddp_option, rank, metrics)

def append_delta(row_list, base, exp):
    percent = 100 * ((exp - base) / base)
    row_list.append(percent)

def print_summary(buffer_size_to_metrics):
    # metrics: {ddp_option, Metrics}
    # Metrics: step -> [latency]
    for buffer_size, metrics in buffer_size_to_metrics.items():
        assert DDPOption.DDP_CPP_CORE in metrics.keys()
        baseline = metrics.get(DDPOption.DDP_CPP_CORE)
        print(f"=== Summary for buffer_size: {buffer_size}M === ")
        for step in baseline.keys():
            # step takes value from [forward, backward]
            # compute latency for each step into a table, each row is looks like
            # [option, mean, diff, mean, diff, p90, diff, p95, diff, p99, diff]
            data = []
            baseline_latencies = baseline.get(step)
            assert baseline_latencies is not None
            A_baseline = np.array(baseline_latencies)
            for ddp_option, exp_metrics in metrics.items():
                exp_latencies = exp_metrics.get(step)
                assert exp_latencies is not None
                A_exp = np.array(exp_latencies)
                # Yield option, mean, p50, p90, p95, p99 and delta.
                row = [ddp_option]
                row.append(np.mean(A_exp))
                append_delta(row, np.mean(A_baseline), np.mean(A_exp))
                for px in [50, 90, 95, 99]:
                    base = np.percentile(A_baseline, px)
                    exp = np.percentile(A_exp, px)
                    row.append(exp)
                    append_delta(row, base, exp)
                data.append(row)

            # Output buffer_size, step as a table.
            print(tabulate(data,
                           headers=[f"DDP: [{step}]", "Mean", "delta%",
                                    "mean", "delta%", "p90", "delta%",
                                    "p95", "delta%%", "p99", "delta%"]))
            print("\n")

def main():
    world_size = 2
    epochs = 120

    # resnet50 model facts:
    # total_param_count = 161
    # total_elements = 25557032 ~= 24.37M
    # param_max_elements = 2359296 ~= 2.25M
    # Try different bucket sizes.
    buffer_size_in_mbs = [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
    print("buffer_size_in_mbs: " + str(buffer_size_in_mbs))
    for buffer_size_in_M in buffer_size_in_mbs:
        print("\n\n=== NEW EXPERIMENT: buffer_size={}M, {} epochs, world_size={} ===".format(
            buffer_size_in_M, epochs, world_size))
        options = [
            DDPOption.DDP_CPP_CORE,
            DDPOption.PYTHON_DDP_ASYNC_REDUCTION,
            DDPOption.PYTHON_DDP_SYNC_REDUCTION
        ]
        for option in options:
            print("Measuring option: {} ... ".format(option))
            mp.spawn(run_ddp,
                     args=(world_size, epochs, option, buffer_size_in_M),
                     nprocs=world_size,
                     join=True)

    print("\n Generating summaries ... ")
    buffer_size_to_metrics = load_detailed_metrics(data_dir="./tmp", ext="ddpraw")
    print_summary(buffer_size_to_metrics)

if __name__ == "__main__" :
    main()