1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
#include <c10/core/ScalarType.h>
#include <c10/util/Exception.h>
#include <torch/csrc/distributed/c10d/default_comm_hooks.hpp>
#include <torch/csrc/distributed/c10d/Ops.hpp>
#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#include <torch/csrc/distributed/c10d/comm.hpp>
#include <torch/torch.h>
namespace c10d {
c10::intrusive_ptr<c10::ivalue::Future> AllReduceCommHook::runHook(
GradBucket& bucket) {
std::vector<at::Tensor> tensors = {bucket.getBufferRef()};
// Apply the division first to avoid overflow, especially for FP16.
tensors[0] /= state_->getSize();
return ops::allreduce(state_, tensors)->getFuture();
}
c10::intrusive_ptr<c10::ivalue::Future> FP16CompressCommHook::runHook(
GradBucket& bucket) {
auto compressed_tensor = bucket.getBufferRef().to(torch::kFloat16);
// Apply the division first to avoid overflow.
compressed_tensor /= state_->getSize();
std::vector<at::Tensor> tensors = {compressed_tensor};
auto allreduce_fut = ops::allreduce(state_, tensors)->getFuture();
auto decompressed_tensor = bucket.getBufferRef();
auto decompress = [decompressed_tensor](c10::ivalue::Future& allreduce_fut) {
auto result = allreduce_fut.value();
TORCH_INTERNAL_ASSERT(
result.isTensorList(),
"ProcessGroup::allreduce should return TensorList");
auto reduce_tensor = result.toTensorVector()[0];
TORCH_INTERNAL_ASSERT_DEBUG_ONLY(
reduce_tensor.scalar_type() == at::ScalarType::Half,
"Expected reduced tensor to be fp16 in FP16CompressHook, but got type ",
reduce_tensor.scalar_type());
decompressed_tensor.copy_(reduce_tensor);
return c10::IValue(decompressed_tensor);
};
return allreduce_fut->then(decompress, allreduce_fut->elementType());
}
c10::intrusive_ptr<c10::ivalue::Future> _AllReduceBySumCommHook::runHook(
GradBucket& bucket) {
std::vector<at::Tensor> tensors = {bucket.getBufferRef()};
return ops::allreduce(state_, tensors)->getFuture();
}
} // namespace c10d
|