1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
|
"""
A simple tool to compare the performance of different impls of
DistributedDataParallel on resnet50, three flavors:
1. DistributedDataParallel, which has a python wrapper and C++ core to do
gradient distribution and reduction. It's current production version.
2. PythonDDP with async gradient reduction.
3. PythonDDP with synchrous gradient reduction.
Example::
>>> modify configs in main func
>>> python compare_ddp.py
>>> Sample out: compare_ddp_sample.md
"""
import numpy as np
import os
import pickle
import glob
import python_ddp
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from collections import OrderedDict
from enum import Enum
from tabulate import tabulate
from torch.nn.parallel import DistributedDataParallel as DDP
class DDPOption(Enum):
DDP_CPP_CORE = 1
PYTHON_DDP_SYNC_REDUCTION = 2
PYTHON_DDP_ASYNC_REDUCTION = 3
class LatencyData:
__slots__ = ["buffer_size_in_M", "ddp_option", "rank", "metrics"]
def __init__(self, buffer_size_in_M, ddp_option, rank, metrics):
self.buffer_size_in_M = buffer_size_in_M
self.ddp_option = ddp_option
self.rank = rank
self.metrics = metrics
def serialize(buffer_size_in_M, ddp_option, rank, metrics,
data_dir="./tmp", ext="ddpraw"):
if not os.path.exists(data_dir):
print(f'{data_dir} not exist, mkdir {data_dir}')
os.mkdir(data_dir)
file_name = "buffer_size_{}M_rank{}_{}.{}".format(
buffer_size_in_M, rank, ddp_option, ext)
file_path = os.path.join(data_dir, file_name)
print("Writing metrics to file: '{}'".format(file_path))
data = LatencyData(buffer_size_in_M, ddp_option, rank, metrics)
with open(file_path, "wb") as f:
pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
print(f"Wrote metrics to '{file_path}''")
def load_detailed_metrics(data_dir="./tmp", ext="ddpraw"):
assert os.path.exists(data_dir)
file_pattern = os.path.join(data_dir, f"*.{ext}")
files = glob.glob(file_pattern)
print("load_detailed_metrics found {} files".format(len(files)))
buffer_size_to_metrics = OrderedDict()
for file_path in files:
with open(file_path, "rb") as f:
data = pickle.load(f)
# Add data to buffer_size_to_metrics
buffer_size = data.buffer_size_in_M
if buffer_size not in buffer_size_to_metrics:
buffer_size_to_metrics[buffer_size] = {}
metrics = buffer_size_to_metrics.get(buffer_size)
assert metrics is not None
metrics[data.ddp_option] = data.metrics
return buffer_size_to_metrics
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def create_ddp_model(module, rank, pg, ddp_option, buffer_size_in_M):
"""Helper to create DDPModel. """
if ddp_option == DDPOption.DDP_CPP_CORE:
ddp_model = DDP(module, device_ids=[rank],
process_group=pg,
bucket_cap_mb=buffer_size_in_M)
ddp_model._set_static_graph()
return ddp_model
elif ddp_option == DDPOption.PYTHON_DDP_SYNC_REDUCTION:
M = 2 ** 20
return python_ddp.PythonDDP(module, pg, False, buffer_size=buffer_size_in_M * M)
elif ddp_option == DDPOption.PYTHON_DDP_ASYNC_REDUCTION:
M = 2 ** 20
return python_ddp.PythonDDP(module, pg, True, buffer_size=buffer_size_in_M * M)
else:
raise NotImplementedError
def run_ddp(rank, world_size, epochs, ddp_option, buffer_size_in_M, warmup_iterations=20):
print(f'Invoked run_ddp rank {rank}')
assert epochs > warmup_iterations
# Setup
print("setting up ... ")
setup(rank, world_size)
torch.manual_seed(rank)
torch.cuda.manual_seed(rank)
device = torch.device('cuda:%d' % rank)
print('setup done')
# Create ResNet50 module and wrap in DDP module.
pg = dist.distributed_c10d._get_default_group()
model = models.resnet50().to(device)
ddp_model = create_ddp_model(model, rank, pg, ddp_option, buffer_size_in_M)
assert ddp_model is not None
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# Container to hold: event -> list of events in milliseconds
MODEL_FORWARD = "forward"
MODEL_BACKWARD = "backward"
metrics = {MODEL_FORWARD: [], MODEL_BACKWARD: []}
for epoch in range(epochs):
if epoch % 10 == 0:
print(f'Epoch {epoch}/{epochs} ...')
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
# TODO(bowangbj): Switch to real training set from ImageNet.
inputs = torch.rand([32, 3, 224, 224], device=device)
labels = torch.rand([32, 1000], device=device)
# Forward
start.record()
outputs = ddp_model(inputs)
loss = loss_fn(outputs, labels)
end.record()
torch.cuda.synchronize()
if epoch >= warmup_iterations:
metrics[MODEL_FORWARD].append(start.elapsed_time(end))
# Backward
start.record()
loss.backward()
# Reduce all grad, this is needed for non-DDP_CPP_CORE since the hook
# for all_reduce does not exist yet.
if ddp_option != DDPOption.DDP_CPP_CORE:
ddp_model.all_reduce_grads()
end.record()
torch.cuda.synchronize()
if epoch >= warmup_iterations:
metrics[MODEL_BACKWARD].append(start.elapsed_time(end))
# Optimization
optimizer.step()
optimizer.zero_grad()
if rank == 0:
print(f"\nMetrics for GPU {rank}, ddp_option={ddp_option}, buffer_size={buffer_size_in_M}M")
print(f"Skipped {warmup_iterations} CUDA warmpup iterations. ")
for step, elapsed_milliseconds in metrics.items():
A = np.array(elapsed_milliseconds)
print(' {N} iterations, {step}, mean={mean} ms, median={median} ms, p90={p90} ms, p99={p99} ms'.format(
N=len(A), step=step, mean=np.mean(A),
median=np.percentile(A, 50), p90=np.percentile(A, 90),
p99=np.percentile(A, 99)))
# Serialize the raw data to be used to compute summary. Didn't choose to
# maintain a global object holding the metrics b/c mp.spawn tries to
# fork all the arguments before spawning new process thus it's infeasible
# save global states in an object.
serialize(buffer_size_in_M, ddp_option, rank, metrics)
def append_delta(row_list, base, exp):
percent = 100 * ((exp - base) / base)
row_list.append(percent)
def print_summary(buffer_size_to_metrics):
# metrics: {ddp_option, Metrics}
# Metrics: step -> [latency]
for buffer_size, metrics in buffer_size_to_metrics.items():
assert DDPOption.DDP_CPP_CORE in metrics.keys()
baseline = metrics.get(DDPOption.DDP_CPP_CORE)
print(f"=== Summary for buffer_size: {buffer_size}M === ")
for step in baseline.keys():
# step takes value from [forward, backward]
# compute latency for each step into a table, each row is looks like
# [option, mean, diff, mean, diff, p90, diff, p95, diff, p99, diff]
data = []
baseline_latencies = baseline.get(step)
assert baseline_latencies is not None
A_baseline = np.array(baseline_latencies)
for ddp_option, exp_metrics in metrics.items():
exp_latencies = exp_metrics.get(step)
assert exp_latencies is not None
A_exp = np.array(exp_latencies)
# Yield option, mean, p50, p90, p95, p99 and delta.
row = [ddp_option]
row.append(np.mean(A_exp))
append_delta(row, np.mean(A_baseline), np.mean(A_exp))
for px in [50, 90, 95, 99]:
base = np.percentile(A_baseline, px)
exp = np.percentile(A_exp, px)
row.append(exp)
append_delta(row, base, exp)
data.append(row)
# Output buffer_size, step as a table.
print(tabulate(data,
headers=[f"DDP: [{step}]", "Mean", "delta%",
"mean", "delta%", "p90", "delta%",
"p95", "delta%%", "p99", "delta%"]))
print("\n")
def main():
world_size = 2
epochs = 120
# resnet50 model facts:
# total_param_count = 161
# total_elements = 25557032 ~= 24.37M
# param_max_elements = 2359296 ~= 2.25M
# Try different bucket sizes.
buffer_size_in_mbs = [3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27]
print("buffer_size_in_mbs: " + str(buffer_size_in_mbs))
for buffer_size_in_M in buffer_size_in_mbs:
print("\n\n=== NEW EXPERIMENT: buffer_size={}M, {} epochs, world_size={} ===".format(
buffer_size_in_M, epochs, world_size))
options = [
DDPOption.DDP_CPP_CORE,
DDPOption.PYTHON_DDP_ASYNC_REDUCTION,
DDPOption.PYTHON_DDP_SYNC_REDUCTION
]
for option in options:
print("Measuring option: {} ... ".format(option))
mp.spawn(run_ddp,
args=(world_size, epochs, option, buffer_size_in_M),
nprocs=world_size,
join=True)
print("\n Generating summaries ... ")
buffer_size_to_metrics = load_detailed_metrics(data_dir="./tmp", ext="ddpraw")
print_summary(buffer_size_to_metrics)
if __name__ == "__main__" :
main()
|