1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
# Copyright 2019 Kakao Brain
#
# Copyright (c) Facebook, Inc. and its affiliates. All rights reserved.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
"""A helper to roughly balance a sequential module.
Usage::
import torch
from torch.distributed.pipeline.sync import Pipe
from torch.distributed.pipeline.sync.balance import balance_by_time
sample = torch.empty(128, 3, 224, 224)
balance = balance_by_time(torch.cuda.device_count(), model, sample)
pipe = Pipe(model, balance, chunks=8)
"""
from typing import Any, List, Union, Sequence
import torch
from torch import Tensor
import torch.nn as nn
from . import blockpartition
from .profile import profile_sizes, profile_times
__all__ = ["balance_by_time", "balance_by_size"]
Device = Union[torch.device, int, str]
Tensors = Sequence[Tensor]
TensorOrTensors = Union[Tensor, Tensors]
def balance_cost(cost: List[int], partitions: int) -> List[int]:
partitioned = blockpartition.solve(cost, partitions)
return [len(p) for p in partitioned]
def balance_by_time(
partitions: int,
module: nn.Sequential,
sample: Union[List[Any], Tensor],
*,
timeout: float = 1.0,
device: Device = torch.device("cuda"),
) -> List[int]:
"""Naive automatic balancing by elapsed time per layer.
::
sample = torch.empty(128, 3, 224, 224)
balance = balance_by_time(torch.cuda.device_count(), model, sample)
pipe = Pipe(model, balance, chunks=8)
Args:
partitions (int):
intended number of partitions
module (torch.nn.Sequential):
sequential module to be partitioned
sample (torch.Tensor):
example input with arbitrary batch size
Keyword Args:
timeout (float):
profiling iterates again if the timeout (in second) is not exceeded
(default: ``1.0``)
device ('cpu' or 'cuda' device):
CPU or CUDA device where each layer is profiled (default: the
current CUDA device)
Returns:
A list of number of layers in each partition. Use it for the `balance`
parameter of :class:`~torchpipe.Pipe`.
.. note::
`module` and `sample` must be placed on the same device.
"""
times = profile_times(module, sample, timeout, torch.device(device))
return balance_cost(times, partitions)
def balance_by_size(
partitions: int,
module: nn.Sequential,
input: Union[List[Any], Tensor],
*,
chunks: int = 1,
param_scale: float = 2.0,
device: Device = torch.device("cuda"),
) -> List[int]:
"""Naive automatic balancing by CUDA memory usage per layer.
During training, required memory for parameters depends on which optimizer
is used. Optimizers may use buffers for each parameter to track
optimization statistics internally, such as momentum buffer in SGD.
To get more reliable size based balance, you should specify `param_scale`
with regard to your optimizer. The default `param_scale` is 2 instead of 1
due to gradient accumulation which is necessary for every optimizer.
Follow this guide to choose correct `param_scale` for typical optimizers:
========= ============= =========================================
Optimizer `param_scale` Internal State
========= ============= =========================================
SGD 2--3 (momentum_buffer)
Adam 4--5 exp_avg, exp_avg_sq, (max_exp_avg_sq)
Adadelta 4 square_avg, acc_delta
Adagrad 3 sum
RMSprop 3--5 square_avg, (momentum_buffer), (grad_avg)
========= ============= =========================================
Here's a simple example with the Adam optimizer::
balance = balance_by_size(
torch.cuda.device_count(),
model,
# Same size with mini-batch to train
torch.empty(1024, 3, 224, 224),
# Number of micro-batches to train with Pipe
chunks=8,
# 4 for Adam
param_scale=4.0,
)
pipe = Pipe(model, balance, chunks=8)
adam = Adam(pipe.parameters())
Args:
partitions (int):
intended number of partitions
module (torch.nn.Sequential):
sequential module to be partitioned
input (torch.Tensor):
example mini-batch with the same size to train
Keyword Args:
chunks (int):
number of micro-batches will be used to train (default: ``1``)
param_scale (float):
how many copies of parameters would be allocated for training. It
depends on optimizer. See the above guide. (default: ``2.0``)
device ('cuda' device):
CUDA device where each layer is profiled (default: the current CUDA
device)
Returns:
A list of number of layers in each partition. Use it for the `balance`
parameter of :class:`~torchpipe.Pipe`.
.. note::
`module` and `input` must be placed on the same CUDA device.
"""
sizes = profile_sizes(module, input, chunks, param_scale, torch.device(device))
return balance_cost(sizes, partitions)
|