File: default_comm_hooks.cpp

package info (click to toggle)
pytorch-cuda 2.6.0%2Bdfsg-7
  • links: PTS, VCS
  • area: contrib
  • in suites: forky, sid, trixie
  • size: 161,620 kB
  • sloc: python: 1,278,832; cpp: 900,322; ansic: 82,710; asm: 7,754; java: 3,363; sh: 2,811; javascript: 2,443; makefile: 597; ruby: 195; xml: 84; objc: 68
file content (61 lines) | stat: -rw-r--r-- 2,258 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <c10/core/ScalarType.h>
#include <c10/util/Exception.h>
#include <torch/csrc/distributed/c10d/default_comm_hooks.hpp>

#include <torch/csrc/distributed/c10d/ProcessGroup.hpp>
#include <torch/csrc/distributed/c10d/comm.hpp>
#include <torch/torch.h>

namespace c10d {

c10::intrusive_ptr<c10::ivalue::Future> AllReduceCommHook::runHook(
    GradBucket& bucket) {
  std::vector<at::Tensor> tensors = {bucket.getBufferRef()};
  // Apply the division first to avoid overflow, especially for FP16.
  tensors[0] /= state_->getSize();
  return state_->allreduce(tensors)->getFuture();
}

c10::intrusive_ptr<c10::ivalue::Future> FP16CompressCommHook::runHook(
    GradBucket& bucket) {
  auto compressed_tensor = bucket.getBufferRef().to(torch::kFloat16);
  // Apply the division first to avoid overflow.
  compressed_tensor /= state_->getSize();
  std::vector<at::Tensor> tensors = {compressed_tensor};

  auto allreduce_fut = state_->allreduce(tensors)->getFuture();
  auto decompressed_tensor = bucket.getBufferRef();
  auto decompress = [decompressed_tensor](c10::ivalue::Future& allreduce_fut) {
    auto result = allreduce_fut.value();
    TORCH_INTERNAL_ASSERT(
        result.isTensorList(),
        "ProcessGroup::allreduce should return TensorList");

    auto reduce_tensor = result.toTensorVector()[0];
    TORCH_INTERNAL_ASSERT_DEBUG_ONLY(
        reduce_tensor.scalar_type() == at::ScalarType::Half,
        "Expected reduced tensor to be fp16 in FP16CompressHook, but got type ",
        reduce_tensor.scalar_type());
    decompressed_tensor.copy_(reduce_tensor);
    return c10::IValue(decompressed_tensor);
  };

  return allreduce_fut->then(decompress, allreduce_fut->elementType());
}

c10::intrusive_ptr<c10::ivalue::Future> _AllReduceBySumCommHook::runHook(
    GradBucket& bucket) {
  std::vector<at::Tensor> tensors = {bucket.getBufferRef()};
#ifdef IS_NCCLX
  // case with sparse_metadata_ set and using indices from there
  if (bucket.getSparseGradIndices().has_value()) {
    AllreduceOptions opts = AllreduceOptions();
    opts.sparseIndices = bucket.getSparseGradIndices().value();
    return state_->allreduce(tensors, opts)->getFuture();
  }
#else
  return state_->allreduce(tensors)->getFuture();
#endif
}

} // namespace c10d